shell中的比较不是使用简单的> = <等,而是用扩展符,如下所示:
-eq //equal 等于 -ne //no equal 不等于 -gt //great than 大于 -lt // low than 小于 ge // great and equal 大于等于,注意没有"-" le //low and equal 小于等于,注意没有“-”
shell打印日志的方法
log(){ logstr= now=`date "+%Y%m%d-%H:%M:%S"` echo -e "[DEBUG ${now}]"$logstr }
shell判断MR任务是否执行成功的代码
function hdp_succ(){ hdfiles= max= waittime= for((i=0;i<$max;i+=1)); do ${HADOOP} fs -test -e $hdfiles/_SUCCESS if [ $? -eq 0 ] ; then break fi log "waiting..." sleep $waittime done if [ $i -eq $max ] && [ $max -eq 36 ] ; then curl --data-urlencode "subject=gpu2_deepfm:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&" exit 255 fi if [ $i -eq $max ] && [ $max -eq 6 ] ; then curl --data-urlencode "subject=gpu2_deepfm:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=alarm&" fi } hdp_succ ${hdfs_traindata} 36 "5m" hdp_succ ${hdfs_evaldata} 36 "5m"
shell将hadoop集群上的文件拉到本地的模块
rm -rf $train_data mkdir $train_data files=`$HADOOP fs -ls ${hdfs_traindata} | awk -F ' ' '{print }'|grep 'part'` for file_name in $files do echo $file_name $HADOOP fs -get $file_name ${train_data}/. done
shell工程化经验总结
获取日期
TODAY=`date +%Y%m%d` lastday=`date +"%Y-%m-%d" -d "-1 day"` lastday_v2=`date +"%Y%m%d" -d "-1 day"`
hadoop-straming的多路输入
INPUT_PAT="" for ((k=$((i+0));k<$((i+30));k+=1));do day_tmp=`date +%Y%m%d -d "$k days ago"` INPUT_PAT="${hdfs_feedback_log_dir}/${day_tmp} ${INPUT_PAT}" done hadoop streaming -input $INPUT_PAT
hadoop-streaming的reducer通用(bi分析)
def handle_line(segs, infos): for i in range(len(infos)): infos[i] += int(segs[i + 1]) return infos def print_line(last_id, infos): print_infos = [last_id] for i in infos: print_infos.append(str(i)) print('t'.join(print_infos)) def reducer1(): last_id = "" infos = [] flag = False for line in sys.stdin: line = line.strip("n") if len(line) > 0: try: segs = line.split("t") if not flag: last_id = segs[0] infos = [0] * (len(segs) - 1) if segs[0] == last_id: flag = True infos = handle_line(segs, infos) else: print_line(last_id, infos) infos = [0] * (len(segs) - 1) last_id = segs[0] infos = handle_line(segs, infos) except: infos = [0] * (len(line.split("t")) - 1) continue print_line(last_id, infos)
hadoop-streaming模板
#!/bin/bash set -x HADOOP_HOME=/usr/bin/hadoop/software/hadoop/ hadoop=$HADOOP_HOME/bin/hadoop map=cat red=cat map_num=2000 red_num=400 job_name=majing1-pd_template priority=HIGH alarm='' delete=1 download='' #compress="-jobconf mapred.compress.map.output=true -jobconf mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" compress="-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" while getopts "i:o:m:r:x:y:k:s:n:f:p:d:a:c:l:h:e:" opt do case "$opt" in a) alarm=$OPTARG;; d) delete=$OPTARG;; l) download=$OPTARG;; i) input=$OPTARG;; o) output=$OPTARG;; m) map=$OPTARG;; r) red=$OPTARG;; x) map_num=$OPTARG;; y) red_num=$OPTARG;; n) job_name=$OPTARG;; f) f=$OPTARG;; p) priority=$OPTARG;; s) suffix_out="-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" suffix_out_value="-jobconf suffix.multiple.outputformat.filesuffix=$OPTARG";; k) key_field="-partitioner org.apache.hadoop.mapred.lib.KeyFieldbasedPartitioner" key_field_value1="-jobconf stream.num.map.output.key.fields=2" key_field_value2="-jobconf num.key.fields.for.partition=1";; h) echo "cacheArchive:"$OPTARG cacheArchive="-cacheArchive $OPTARG";; e) echo "cacheFile:"$OPTARG cacheFile="-cacheFile $OPTARG";; c) unset compress;; ?) echo '?';; esac done function rawlog(){ if [ $delete -ne 0 ]; then $hadoop fs -rmr $output fi $hadoop streaming -input $input -output "$output" -mapper "$map" -reducer "$red" -file $f $cacheArchive $cacheFile $key_field $suffix_out $suffix_out_value $key_field_value1 $key_field_value2 $compress -jobconf mapred.job.name=$job_name -jobconf mapred.map.tasks=$map_num -jobconf mapred.job.priority=$priority -jobconf mapred.reduce.tasks=$red_num -jobconf mapred.success.file.status=true -jobconf mapred.reduce.slowstart.completed.maps=0.9999 -jobconf mapreduce.job.queuename=hdp-svideo $hadoop fs -test -e $output/_SUCCESS if [ $? -ne 0 ]; then if [ $alarm != '' ]; then curl --data-urlencode "subject=${alarm}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" fi echo "$job_name failed" exit 255 fi if [[ $download != '' ]]; then $hadoop fs -text $output/* > $download fi } rawlog
shell工具模板
HADOOP=/usr/bin/hadoop/software/hadoop/bin/hadoop PYTHON=/home/hdp-svideo-algo/majing1-pd/miniconda3/bin/python HPY3=/opt/soft/anaconda3/bin/python HPY2=/opt/soft/anaconda2/bin/python function log(){ logstr= now=`date "+%Y%m%d-%H:%M:%S"` echo -e "[DEBUG ${now}]"$logstr } function alarm(){ content= curl --data-urlencode "subject=${content}" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" } function hdp_succ(){ hdfiles= max= waittime= flag= for((i=0;i<$max;i+=1)); do ${HADOOP} fs -test -e $hdfiles/_SUCCESS if [ $? -eq 0 ] ; then break fi log "waiting..." sleep $waittime done if [ $i -eq $max ] && [ $max -eq 36 ] ; then curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" exit 255 fi if [ $i -eq $max ] && [ $max -eq 2 ] ; then curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" fi if [ $i -eq 6 ] ; then curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" fi } function hdp_succ_2(){ hdfiles= max= waittime= flag= succfile= for((i=0;i<$max;i+=1)); do ${HADOOP} fs -test -e $hdfiles/${succfile} if [ $? -eq 0 ] ; then break fi log "waiting..." sleep $waittime done if [ $i -eq $max ] && [ $max -eq 36 ] ; then curl --data-urlencode "subject=${flag}:exit,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" exit 255 fi if [ $i -eq $max ] && [ $max -eq 2 ] ; then curl --data-urlencode "subject=${flag}:next,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" fi if [ $i -eq 6 ] ; then curl --data-urlencode "subject=${flag}:wait,no_input" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" fi } function result_succ(){ hdfiles= flag= ${HADOOP} fs -test -e $hdfiles/_SUCCESS if [ $? -ne 0 ] ; then curl --data-urlencode "subject=${flag}_failed" "http://alarm.mis.corp.qihoo.net:8360/alarm?group_name=video_select&" exit fi } function get_date(){ before_nday= day=`date +%Y%m%d -d "$before_nday days ago"` echo ${day} } function get_max_hdp_file(){ path1= path2= minline= for (( i=0; i<60; i++ ));do date=$(get_date $i) path_detail=${path1}/${date}/${path2} ${HADOOP} fs -test -e ${path_detail}/_SUCCESS if [ $? -eq 0 ] ; then count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print }'` if [ $count -gt $minline ] ; then break fi fi done echo ${date} } function get_max_hdp_file_2(){ path1= path2= minline= path3= for (( i=0; i<30; i++ ));do date=$(get_date $i) path_detail=${path1}/${date}${path3}/${path2} ${HADOOP} fs -test -e ${path_detail}/_SUCCESS if [ $? -eq 0 ] ; then count=`${HADOOP} fs -dus ${path_detail} | awk -F ' ' '{print }'` if [ $count -gt $minline ] ; then break fi fi done echo ${date} }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)