1、每天利用python执行hive sql脚本
2、执行前检测依赖表是否生成
3、依赖表如果生成成功会touch一个success文件到hdfs目录
4、循环检测一定次数,失败抛出异常
上代码:#coding:utf-8 #@auth: lgy import datetime import subprocess import time import sys #获取昨天日期 def get_yesterday(format="%Y%m%d"): today = datetime.date.today() yesterday = today + datetime.timedelta(days=-1) return yesterday.strftime(format) #检测文件 def work(): yesterday = get_yesterday() error_count = 0 #需要检测的hdfs路径 hdfs_paths = ['hdfs://dt=%s/_SUCCESS'%yesterday, 'hdfs://dt=%s/_SUCCESS'%yesterday] #循环检测 for hdfs_path in hdfs_paths: #检测语句 filexistchk = "hdfs dfs -test -e " + hdfs_path + ";echo $?" while 1: #执行检测 filexistchk_output = subprocess.Popen(filexistchk, shell=True, stdout=subprocess.PIPE).communicate() #如果存在 if '1' not in str(filexistchk_output[0]): print (hdfs_path+" is exists!") break else: error_count += 1 if error_count==50: #50次没有检测出来 抛出异常 raise Exception("依赖表生成失败!") now_t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print (hdfs_path+" %s not exists"%(now_t)) time.sleep(10*60) #如果没有异常 执行hive脚本 execute_hive_path='/' execute_hive="/app/hive/bin/hive --hivevar yesterday=%s -f %s.sql"%(yesterday,execute_hive_path+"/"+'file_name') print(execute_hive) #子进程执行hive p = subprocess.Popen(execute_hive, shell=True,stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) #循环读取子进程执行情况,读取子进程返回结果 while p.poll()==None: print(p.stdout.readline()) #如果返回code为失败 if(p.returncode!=0): errorInfo = p.stdout.read() print("%s execute failed,error info: %s"%('file_name', errorInfo)) sys.exit(1) #有错误退出 else: print("%s execute success!") sys.exit(0) #无错误退出
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)