# -*- coding: utf-8 -*- from datetime import datetime import paramiko import re class ShellHandler: def __init__(self, host, user, psw): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect(host, username=user, password=psw, port=22) channel = self.ssh.invoke_shell() self.stdin = channel.makefile('wb') self.stdout = channel.makefile('r') def __del__(self): self.ssh.close() def execute(self, cmd): """ :param cmd: the command to be executed on the remote computer :examples: execute('ls') execute('finger') execute('cd folder_name') """ cmd = cmd.strip('n') self.stdin.write(cmd + 'n') finish = 'end of stdOUT buffer. finished with exit status' echo_cmd = 'echo {} $?'.format(finish) self.stdin.write(echo_cmd + 'n') shin = self.stdin self.stdin.flush() shout = [] sherr = [] exit_status = 0 for line in self.stdout: if str(line).startswith(cmd) or str(line).startswith(echo_cmd): # up for now filled with shell junk from stdin shout = [] elif str(line).startswith(finish): # our finish command ends with the exit status exit_status = int(str(line).rsplit(maxsplit=1)[1]) if exit_status: # stderr is combined with stdout. # thus, swap sherr with shout in a case of failure. sherr = shout shout = [] break else: # get rid of 'coloring and formatting' special characters shout.append(re.compile(r'(x9B|x1B[)[0-?]*[ -/]*[@-~]').sub('', line). replace('b', '').replace('r', '')) # first and last lines of shout/sherr contain a prompt if shout and echo_cmd in shout[-1]: shout.pop() if shout and cmd in shout[0]: shout.pop(0) if sherr and echo_cmd in sherr[-1]: sherr.pop() if sherr and cmd in sherr[0]: sherr.pop(0) return shin, shout, sherr def import_data(shell_handler): # 获取当前日期 current_date = datetime.now().strftime("%Y-%m-%d") # 切换到hdfs用户 shell_handler.execute('su hdfs n') # tbl_base_line sqoop_command = f""" sqoop import \ --connect jdbc:mysql://xxx.xxx.xx.xx:4000/db_base_data?tinyInt1isBit=false \ --username xxxxx \ --password xxxxx \ --query "select * from tbl_base_line where $CONDITIONS" \ --fields-terminated-by '\t' \ --delete-target-dir \ --hive-import \ --m 1 \ --hive-partition-key record_date \ --hive-partition-value {current_date} \ --hive-database shenzhen_shenba \ --hive-table tbl_base_line \ --target-dir /user/hive/warehouse/shenzhen_shenba.db/tbl_base_line/record_date={current_date}/ \ --delete-target-dir \ --hive-overwrite \ --direct; """ shin, shout, sherr = shell_handler.execute(sqoop_command) for s in shout: print(s) for s in sherr: print(s) print("tbl_base_line import complete!") if __name__ == '__main__': # 通过sqoop从mysql源表全量导入到hive表,按天分区 host = 'xxx.xxx.xx.xx' user = 'root' password = 'xxxxxx' shell_handler = ShellHandler(host, user, password) import_data(shell_handler)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)