hadoop-streaming万能脚本模板

hadoop-streaming万能脚本模板,第1张

hadoop-streaming万能脚本模板 shell相关比较逻辑运算语句

shell中的比较不是使用简单的> = <等,而是用扩展符,如下所示:

  -eq     //equal  等于

  -ne     //no equal 不等于

  -gt      //great than 大于

  -lt       // low than  小于

  ge      // great and equal 大于等于,注意没有"-"

  le      //low and equal 小于等于,注意没有“-”

shell打印日志的方法

log(){
        logstr=
        now=`date "+%Y%m%d-%H:%M:%S"`
        echo -e "[DEBUG ${now}]"$logstr
}

shell判断MR任务是否执行成功的代码

function hdp_succ(){
    hdfiles=
    max=
    waittime=
    for((i=0;i<$max;i+=1)); do
        ${HADOOP} fs -test  -e $hdfiles/_SUCCESS
        if [ $? -eq 0 ] ; then
            break
        fi
        log "waiting..."
        sleep $waittime
    done
    if [ $i -eq $max ] && [ $max -eq 36 ] ; then
        curl --data-urlencode "subject=gpu2_deepfm:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&"
        exit 255
    fi
    if [ $i -eq $max ] && [ $max -eq 6 ] ; then
        curl --data-urlencode "subject=gpu2_deepfm:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&"
    fi
}

hdp_succ ${hdfs_traindata} 36 "5m"
hdp_succ ${hdfs_evaldata} 36 "5m"

shell将hadoop集群上的文件拉到本地的模块

rm -rf $train_data
mkdir $train_data
files=`$HADOOP fs -ls ${hdfs_traindata} | awk -F ' ' '{print }'|grep 'part'`
for file_name in $files
do
    echo $file_name
    $HADOOP fs -get $file_name ${train_data}/.
done

shell工程化经验总结

获取日期

TODAY=`date +%Y%m%d`
lastday=`date +"%Y-%m-%d" -d "-1 day"`
lastday_v2=`date +"%Y%m%d" -d "-1 day"`

hadoop-straming的多路输入

INPUT_PAT=""
for ((k=$((i+0));k<$((i+30));k+=1));do
    day_tmp=`date +%Y%m%d -d "$k days ago"`
    INPUT_PAT="${hdfs_feedback_log_dir}/${day_tmp} ${INPUT_PAT}"
done

hadoop streaming 
    -input $INPUT_PAT 

hadoop-streaming的reducer通用(bi分析)

def handle_line(segs, infos):
    for i in range(len(infos)):
        infos[i] += int(segs[i + 1])
    return infos

def print_line(last_id, infos):
    print_infos = [last_id]
    for i in infos:
        print_infos.append(str(i))
    print('t'.join(print_infos))

def reducer1():
    last_id = ""
    infos = []
    flag = False
    for line in sys.stdin:
        line = line.strip("n")
        if len(line) > 0:
            try:
                segs = line.split("t")
                if not flag:
                    last_id = segs[0]
                    infos = [0] * (len(segs) - 1)
                if segs[0] == last_id:
                    flag = True
                    infos = handle_line(segs, infos)
                else:
                    print_line(last_id, infos)
                    infos = [0] * (len(segs) - 1)
                    last_id = segs[0]
                    infos = handle_line(segs, infos)
            except:
                infos = [0] * (len(line.split("t")) - 1)
                continue
    print_line(last_id, infos)

hadoop-streaming模板

#!/bin/bash
set -x
HADOOP_HOME=/usr/bin/hadoop/software/hadoop/
hadoop=$HADOOP_HOME/bin/hadoop

map=cat
red=cat
map_num=2000
red_num=400
job_name=majing1-pd_template
priority=HIGH
alarm=''
delete=1
download=''
#compress="-jobconf mapred.compress.map.output=true -jobconf mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"
compress="-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"

while getopts "i:o:m:r:x:y:k:s:n:f:p:d:a:c:l:h:e:" opt
do
    case "$opt" in
        a)
                alarm=$OPTARG;;
        d)
                delete=$OPTARG;;
        l)
                download=$OPTARG;;
        i)
                input=$OPTARG;;
        o)
                output=$OPTARG;;
        m)
                map=$OPTARG;;
        r)
                red=$OPTARG;;
        x)
                map_num=$OPTARG;;
        y)
                red_num=$OPTARG;;
        n)
                job_name=$OPTARG;;
        f)
                f=$OPTARG;;
        p)
                priority=$OPTARG;;
        s)
                suffix_out="-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat"
                suffix_out_value="-jobconf suffix.multiple.outputformat.filesuffix=$OPTARG";;
        k)
                key_field="-partitioner org.apache.hadoop.mapred.lib.KeyFieldbasedPartitioner"
                key_field_value1="-jobconf stream.num.map.output.key.fields=2"
                key_field_value2="-jobconf num.key.fields.for.partition=1";;
        h)
                echo "cacheArchive:"$OPTARG
                cacheArchive="-cacheArchive $OPTARG";;
        e)
                echo "cacheFile:"$OPTARG
                cacheFile="-cacheFile $OPTARG";;
        c)
                unset compress;;
        ?)
                echo '?';;
    esac
done

function rawlog(){
        if [ $delete -ne 0 ]; then
            $hadoop fs -rmr $output
        fi
        $hadoop streaming 
                -input $input 
                -output "$output" 
                -mapper "$map" 
                -reducer "$red" 
                -file $f 
                $cacheArchive 
                $cacheFile 
                $key_field 
                $suffix_out 
                $suffix_out_value 
                $key_field_value1 
                $key_field_value2 
                $compress 
                -jobconf mapred.job.name=$job_name 
                -jobconf mapred.map.tasks=$map_num 
                -jobconf mapred.job.priority=$priority 
                -jobconf mapred.reduce.tasks=$red_num 
                -jobconf mapred.success.file.status=true 
                -jobconf mapred.reduce.slowstart.completed.maps=0.9999 
                -jobconf mapreduce.job.queuename=hdp-svideo

        $hadoop fs -test -e $output/_SUCCESS
        if [ $? -ne 0 ]; then
                if [ $alarm != '' ]; then
                    curl --data-urlencode "subject=${alarm}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
                fi
                echo "$job_name failed"
                exit 255
        fi
        if [[ $download != '' ]]; then
                $hadoop fs -text $output/* > $download
        fi
}

rawlog

shell工具模板

HADOOP=/usr/bin/hadoop/software/hadoop/bin/hadoop
PYTHON=/home/hdp-svideo-algo/majing1-pd/miniconda3/bin/python
HPY3=/opt/soft/anaconda3/bin/python
HPY2=/opt/soft/anaconda2/bin/python

function log(){
        logstr=
        now=`date "+%Y%m%d-%H:%M:%S"`
        echo -e "[DEBUG ${now}]"$logstr
}

function alarm(){
    content=
    curl --data-urlencode "subject=${content}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
}

function hdp_succ(){
    hdfiles=
    max=
    waittime=
    flag=
    for((i=0;i<$max;i+=1)); do
        ${HADOOP} fs -test  -e $hdfiles/_SUCCESS
        if [ $? -eq 0 ] ; then
            break
        fi
        log "waiting..."
        sleep $waittime
    done
    if [ $i -eq $max ] && [ $max -eq 36 ] ; then
        curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
        exit 255
    fi
    if [ $i -eq $max ] && [ $max -eq 2 ] ; then
        curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
    fi
    if [ $i -eq 6 ] ; then
        curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
    fi
}

function hdp_succ_2(){
    hdfiles=
    max=
    waittime=
    flag=
    succfile=
    for((i=0;i<$max;i+=1)); do
        ${HADOOP} fs -test  -e $hdfiles/${succfile}
        if [ $? -eq 0 ] ; then
            break
        fi
        log "waiting..."
        sleep $waittime
    done
    if [ $i -eq $max ] && [ $max -eq 36 ] ; then
        curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
        exit 255
    fi
    if [ $i -eq $max ] && [ $max -eq 2 ] ; then
        curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
    fi
    if [ $i -eq 6 ] ; then
        curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
    fi
}

function result_succ(){
    hdfiles=
    flag=
    ${HADOOP} fs -test  -e $hdfiles/_SUCCESS
    if [ $? -ne 0 ] ; then
        curl --data-urlencode "subject=${flag}_failed" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&"
        exit
    fi
}

function get_date(){
    before_nday=
    day=`date +%Y%m%d -d "$before_nday days ago"`
    echo ${day}
}

function get_max_hdp_file(){
    path1=
    path2=
    minline=
    for (( i=0; i<60; i++ ));do
        date=$(get_date $i)
        path_detail=${path1}/${date}/${path2}
        ${HADOOP} fs -test  -e ${path_detail}/_SUCCESS
        if [ $? -eq 0 ] ; then
            count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print }'`
            if [ $count -gt $minline ] ; then
                break
            fi
        fi
    done
    echo ${date}
}

function get_max_hdp_file_2(){
    path1=
    path2=
    minline=
    path3=
    for (( i=0; i<30; i++ ));do
        date=$(get_date $i)
        path_detail=${path1}/${date}${path3}/${path2}
        ${HADOOP} fs -test  -e ${path_detail}/_SUCCESS
        if [ $? -eq 0 ] ; then
            count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print }'`
            if [ $count -gt $minline ] ; then
                break
            fi
        fi
    done
    echo ${date}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681813.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存