通过sqoop将mysql表导入到hive表(Python实现)

通过sqoop将mysql表导入到hive表(Python实现),第1张

通过sqoop将mysql表导入到hive表(Python实现)
# -*- coding: utf-8 -*-
from datetime import datetime

import paramiko
import re


class ShellHandler:

    def __init__(self, host, user, psw):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh.connect(host, username=user, password=psw, port=22)

        channel = self.ssh.invoke_shell()
        self.stdin = channel.makefile('wb')
        self.stdout = channel.makefile('r')

    def __del__(self):
        self.ssh.close()

    def execute(self, cmd):
        """

        :param cmd: the command to be executed on the remote computer
        :examples:  execute('ls')
                    execute('finger')
                    execute('cd folder_name')
        """
        cmd = cmd.strip('n')
        self.stdin.write(cmd + 'n')
        finish = 'end of stdOUT buffer. finished with exit status'
        echo_cmd = 'echo {} $?'.format(finish)
        self.stdin.write(echo_cmd + 'n')
        shin = self.stdin
        self.stdin.flush()

        shout = []
        sherr = []
        exit_status = 0
        for line in self.stdout:
            if str(line).startswith(cmd) or str(line).startswith(echo_cmd):
                # up for now filled with shell junk from stdin
                shout = []
            elif str(line).startswith(finish):
                # our finish command ends with the exit status
                exit_status = int(str(line).rsplit(maxsplit=1)[1])
                if exit_status:
                    # stderr is combined with stdout.
                    # thus, swap sherr with shout in a case of failure.
                    sherr = shout
                    shout = []
                break
            else:
                # get rid of 'coloring and formatting' special characters
                shout.append(re.compile(r'(x9B|x1B[)[0-?]*[ -/]*[@-~]').sub('', line).
                             replace('b', '').replace('r', ''))

        # first and last lines of shout/sherr contain a prompt
        if shout and echo_cmd in shout[-1]:
            shout.pop()
        if shout and cmd in shout[0]:
            shout.pop(0)
        if sherr and echo_cmd in sherr[-1]:
            sherr.pop()
        if sherr and cmd in sherr[0]:
            sherr.pop(0)

        return shin, shout, sherr


def import_data(shell_handler):
    # 获取当前日期
    current_date = datetime.now().strftime("%Y-%m-%d")

    # 切换到hdfs用户
    shell_handler.execute('su hdfs n')

    # tbl_base_line 
    sqoop_command = f"""
    sqoop import \
    --connect jdbc:mysql://xxx.xxx.xx.xx:4000/db_base_data?tinyInt1isBit=false \
    --username xxxxx \
    --password xxxxx \
    --query "select * from tbl_base_line where $CONDITIONS" \
    --fields-terminated-by '\t' \
    --delete-target-dir \
    --hive-import \
    --m 1 \
    --hive-partition-key record_date \
    --hive-partition-value {current_date} \
    --hive-database shenzhen_shenba \
    --hive-table tbl_base_line \
    --target-dir /user/hive/warehouse/shenzhen_shenba.db/tbl_base_line/record_date={current_date}/ \
    --delete-target-dir \
    --hive-overwrite \
    --direct;
    """

    shin, shout, sherr = shell_handler.execute(sqoop_command)
    for s in shout:
        print(s)
    for s in sherr:
        print(s)
    print("tbl_base_line import complete!")


if __name__ == '__main__':
    # 通过sqoop从mysql源表全量导入到hive表,按天分区
    host = 'xxx.xxx.xx.xx'
    user = 'root'
    password = 'xxxxxx'
    shell_handler = ShellHandler(host, user, password)
    import_data(shell_handler)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5683326.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存