Python实现Paramiko的二次封装

Python实现Paramiko的二次封装,第1张

概述Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考,目前本人巡

Paramiko是一个用于执行SSH命令的Python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考,目前本人巡检百台设备完全无压力。

实现命令执行: 直接使用过程化封装,执行CMD命令.

import paramikossh = paramiko.SSHClIEnt()ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())def BatchCMD(address,username,password,port,command):    try:        ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)        stdin,stdout,stderr = ssh.exec_command(command)        result = stdout.read()        if len(result) != 0:            result = str(result).replace("\n","\n")            result = result.replace("b'","").replace("'","")            return result        else:            return None    except Exception:        return None

实现磁盘巡检: 获取磁盘空间并返回字典格式

def GetAlldiskSpace(address,port):    ref_dict = {}    cmd_dict = {"linux\n" : "df | grep -v 'filesystem' | awk '{print  \":\" }'","AIX\n" : "df | grep -v 'filesystem' | awk '{print  \":\" }'"                }    # 首先检测系统版本    os_version = BatchCMD(address,"uname")    for version,run_cmd in cmd_dict.items():        if(version == os_version):            # 根据不同版本选择不同的命令            os_ref = BatchCMD(address,run_cmd)            ref_List= os_ref.split("\n")            # 循环将其转换为字典            for each in ref_List:                # 判断最后是否为空,过滤最后一项                if each != "":                    ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])    return ref_dict# 磁盘巡检总函数def diskMain():    with open("db.Json","r",enCoding="utf-8") as read_fp:        load_Json = read_fp.read()        Js = Json.loads(load_Json)        base = Js.get("base")        count = len(base)        for each in range(0,count):            print("3[37m-3[0m" * 80)            print("3[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}3[0m".                format(base[each][1],base[each][2],base[each][3],base[each][4]))            print("3[37m-3[0m" * 80)            ref = GetAlldiskSpace(base[each][1],base[each][4])            for k,v in ref.items():                # 判断是否存在空盘                if( v.split("%")[0] != "-"):                    # 将占用百分比转换为整数                    space_ret  = int(v.split("%")[0])                    if space_ret >= 70:                        print("3[31m 磁盘分区: {0:30} \t 磁盘占用: {1:5} 3[0m".format(k,v))                        continue                    if space_ret >= 50:                        print("3[33m 磁盘分区: {0:30} \t 磁盘占用: {1:5} 3[0m".format(k,v))                        continue                    else:                        print("3[34m 磁盘分区: {0:30} \t 磁盘占用: {1:5} 3[0m".format(k,v))                        continue            print()# 组内传递用户名密码时调用此方法def GroupdiskMain(address,port):    ref = GetAlldiskSpace(address,port)    for k,v in ref.items():        if (v.split("%")[0] != "-"):            space_ret = int(v.split("%")[0])            if space_ret >= 70:                print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k,v))                continue            if space_ret >= 50:                print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k,v))                continue            else:                print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [正常]".format(k,v))                continue    print()

获取系统内存利用率: 获取系统内存利用率

def GetAllMemSpace(address,port):    cmd_dict = {"linux\n" : "cat /proc/meminfo | head -n 2 | awk '{print }' | xargs | awk '{print  \":\" }'",run_cmd)            # 首先现将KB转化为MB            mem_total = math.ceil( int(os_ref.split(":")[0].replace("\n","")) / 1024)            mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)            mem_used = str( int(mem_total) - int(mem_free))            # 计算占用空间百分比            percentage = 100 - int(mem_free / int(mem_total / 100))            print("内存总计空间: {}".format(str(mem_total) + " MB"))            print("内存剩余空间: {}".format(str(mem_free) + " MB"))            print("内存已用空间: {}".format(str(mem_used) + " MB"))            print("计算百分比: {}".format(str(percentage) + " %"))

获取系统进程信息: 获取系统进程信息,并返回字典格式

def GetAllProcessspace(address,port):    ref_dict = {}    cmd_dict = {"linux\n" : "ps aux | grep -v 'USER' | awk '{print  \":\" }' | uniq","AIX\n" : "ps aux | grep -v 'USER' | awk '{print  \":\" }' | uniq"                }    os_version = BatchCMD(address,run_cmd in cmd_dict.items():        if(version == os_version):            os_ref = BatchCMD(address,run_cmd)            ref_List = os_ref.split("\n")            for each in ref_List:                if each != "":                    ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])    return ref_dict# 巡检进程是否存在def ProcessMain():    with open("db.Json",enCoding="utf-8") as read_fp:        load_Json = read_fp.read()        Js = Json.loads(load_Json)        process = Js.get("process")        process_count = len(process)        for x in range(0,process_count):            # 根据process中的值查询base中的账号密码            base = Js.get("base")            if( List(process[x].keys())[0] == base[x][0] ):                # 拿到账号密码之后再提取出他们的进程ID于进程名                print("3[37m-3[0m" * 80)                print("3[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}3[0m".                      format(base[x][1],base[x][2],base[x][3],base[x][4]))                print("3[37m-3[0m" * 80)                ref_dic = GetAllProcessspace(base[x][1],base[x][4])                # ref_val = 全部进程列表 proc_val = 需要检测的进程列表                ref_val = List(ref_dic.values())                proc_val = List(process[x].values())[0]                # 循环比较是否在列表中                for each in proc_val:                    flag = each in ref_val                    if(flag == True):                        print("3[34m 进程: {0:50} 状态: √ 3[0m".format(each))                    else:                        print("3[31m 进程: {0:50} 状态: × 3[0m".format(each))

实现剧本运行功能: 针对特定一台主机运行剧本功能,随便写的一个版本,仅供参考

def RunRule(address,playbook):    os_version = BatchCMD(address,"uname")    if(os_version == List(playbook.keys())[0]):        play = List(playbook.values())[0]        print()        print("3[37m-3[0m" * 130)        print("3[35m 系统类型: {0:4} \t 地址: {1:10} \t 用户名: {2:10} \t 密码: {3:15} \t 端口: {4:4}3[0m"              .format(os_version.replace("\n",""),address,port))        print("3[37m-3[0m" * 130)        for each in range(0,len(play)):            runcmd = play[each] + " > /dev/null 2>&1 && echo $?"            print("3[30m [>] 派发命令: {0:100} \t 状态: {1:5} 3[0m".format(                runcmd.replace(" > /dev/null 2>&1 && echo $?","正在派发"))            os_ref = BatchCMD(address,runcmd)            if(os_ref == "0\n"):                print("3[34m [√] 运行命令: {0:100} \t 状态: {1:5} 3[0m".format(                    runcmd.replace(" > /dev/null 2>&1 && echo $?","派发完成"))            else:                print("3[31m [×] 运行命令: {0:100} \t 状态: {1:5} 3[0m".format(                    runcmd.replace(" > /dev/null 2>&1 && echo $?","派发失败"))                # 既然失败了,就把剩下的也打出来吧,按照失败处理                for x in range(each+1,len(play)):                    print("3[31m [×] 运行命令: {0:100} \t 状态: {1:5} 3[0m".format(                        play[x].replace(" > /dev/null 2>&1 && echo $?","终止执行"))                break    else:        return 0# 批量: 传入主机组不同主机执行不同剧本def RunPlayBook(HostList,PlayBook):    count = len(HostList)    error = []    success = []    for each in range(0,count):        ref = RunRule(HostList[each][0],HostList[each][1],HostList[each][2],HostList[each][3],PlayBook)        if ref == 0:            error.append(HostList[each][0])        else:            success.append(HostList[each][0])    print("\n\n")    print("-" * 130)    print("执行清单")    print("-" * 130)    for each in success:        print("成功主机: {}".format(each))    for each in error:        print("失败主机: {}".format(each))# 运行测试def PlayBookRun():    playbook = \        {            "linux\n":                [                    "ifconfig","vmstat","ls","netstat -an","ifconfis","cat /etc/passwd | grep 'root' | awk '{print }'"                ]        }    addr_List = \        [            ["192.168.1.127","root","1233","22"],["192.168.1.126","1203","22"]        ]    # 指定addr_List这几台机器执行playbook剧本    RunPlayBook(addr_List,playbook)

过程化实现文件上传下载: 文件传输功能 PUT上传 GET下载

def BatchSFTP(address,soruce,target,flag):    transport = paramiko.Transport((address,int(port)))    transport.connect(username=username,password=password)    sftp = paramiko.SFTPClIEnt.from_transport(transport)    if flag == "PUT":        try:            ret = sftp.put(soruce,target)            if ret !="":                transport.close()                return 1            else:                transport.close()                return 0            transport.close()        except Exception:            transport.close()            return 0    elif flag == "GET":        try:            target = str(address + "_" + target)            os.chdir("./recv_file")            ret = sftp.get(soruce,target)            if ret != "":                transport.close()                return 1            else:                transport.close()                return 0            transport.close()        except Exception:            transport.close()            return 0# 批量将本地文件 source 上传到目标 target 中def PutRemotefile(source,target):    with open("db.Json",enCoding="utf-8") as fp:        load_Json = fp.read()        Js = Json.loads(load_Json)        base = Js.get("base")        print("-" * 130)        print("接收主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t 本地文件 \t\t 传输到 \t\t\t 传输状态")        print("-" * 130)        for each in range(0,len(base)):            # 先判断主机是否可通信            ref = BatchCMD(base[each][1],base[each][4],"uname")            if ref == None:                print("3[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未连通3[0m".format(                    base[each][1],source,target))                continue            ref = BatchSFTP(base[each][1],"PUT")            if(ref == 1):                print("3[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功3[0m".format(                    base[each][1],target))            else:                print("3[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输失败3[0m".format(                    base[each][1],target))# 批量将目标文件拉取到本地特定目录(存在缺陷)def GetRemotefile(source,enCoding="utf-8") as fp:        load_Json = fp.read()        Js = Json.loads(load_Json)        base = Js.get("base")        print("-" * 130)        print("发送主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t\t 远程文件 \t\t 拉取到 \t\t\t 传输状态")        print("-" * 130)        for each in range(0,len(base)):            ref = BatchCMD(base[each][1],"GET")            if(ref == 1):                print("3[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功3[0m".format(                    base[each][1],target))

简单实现批量执行SSH命令

import os,paramikossh = paramiko.SSHClIEnt()ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())def BatchCMD(address,stderr = ssh.exec_command(command)        result = stdout.read()        if len(result) != 0:            print('3[0mIP: {} Username:{} Port: {} Status: OK'.format(address,port))            return 1        else:            print('3[45mIP: {} Username:{} Port: {} Status: Error'.format(address,port))            return 0    except Exception:        print('3[45mIP: {} Username:{} Port: {} Status: Error'.format(address,port))        return 0if __name__ == "__main__":    fp = open("ip.log","r+")    for temp in fp.readlines():        ip = temp.split("\n")[0]        BatchCMD(ip,"22","ls && echo $?")

简单实现批量SFTP远程传输

import paramikodef BatchSFTP(address,target)            if ret !="":                print("Addr:{}  Username:{}  Source:{}  Target:{} Success".format(address,target))                return 1            else:                print("Addr:{}  Username:{}  Source:{}  Target:{} Error".format(address,target))                return 0            transport.close()        except Exception:            return 0            transport.close()    elif flag == "GET":        try:            target = str(target + "_" + address)            ret = sftp.get(soruce,target)            if ret != "":                print("Addr:{}  Username:{}  Source:{}  Target:{} Success".format(address,target))                return 0            transport.close()        except Exception:            return 0if __name__ == "__main__":    # 将本地文件./main.py上传到/tmp/main.py    BatchSFTP("192.168.1.20","./main.py","/tmp/main.py","PUT")    # 将目标主机下的/tmp/main.py拷贝到本地文件./get/test.py    BatchSFTP("192.168.1.20","./get/test.py","GET")

通过SSH模块获取系统内存数据 这里我写了一个简单的获取内存数据的脚本,当然获取cpu磁盘等,同样可以这样来搞.

import os,paramiko,ressh = paramiko.SSHClIEnt()ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())def SSH_Get_Mem():    dict ={}    List = []    head =["MemTotal","MemFree","Cached","SwapTotal","SwapFree"]    ssh.connect(hostname="192.168.1.20",username="root",password="1233",port=22,timeout=2)    stdin,stderr = ssh.exec_command('cat /proc/meminfo')    string = str(stdout.read())    for i in [0,1,4,14,15]:  # 取出列表中的这几行        Total = string.split("\n")[i].split(":")[1].replace(" kB","").strip()        List.append(Total)    for (head,List) in zip(head,List):        dict[head]=int(List);  # 组合成一个字典    return dictif __name__ == "__main__":    for i in range(10):        dic = SSH_Get_Mem()        print(dic)

fabric的使用 fabric工具也是自动化运维利器,其默认依赖于paramiko的二次封装.

# 简单实现命令执行from fabric import Connectionconn = Connection(host="192.168.1.10",user="root",port="22",connect_kwargs={"password":"123"})try:    with conn.cd("/var/www/HTML/"):        ret = conn.run("ls -lh",hIDe=True)        print("主机:" + conn.host + "端口:" + conn.port + "完成")except Exception:    print("主机:" + conn.host + "端口:" + conn.port + "失败")# 读取数据到本地from fabric import Connectionconn = Connection(host="192.168.1.20",connect_kwargs={"password":"123"})uname = conn.run('uname -s',hIDe=True)if 'linux' in uname.stdout:    command = "df -h / | tail -n1 | awk '{print }'"    print(conn.run(command,hIDe=True).stdout.strip())# 文件上传与下载from fabric import Connectionconn = Connection(host="192.168.1.20",connect_kwargs={"password":"123"})conn.put("D://zabbix_get.exe","/tmp/zabbix.exe")  # 文件上传conn.get("/tmp/zabbix.exe","./zab.exe")           # 下载文件

另一种命令执行方法:

import paramikossh = paramiko.SSHClIEnt()ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())def BatchCMD(address,stderr = ssh.exec_command(command)        result = stdout.read()        if len(result) != 0:            return result        else:            return -1    except Exception:        return -1# 通过获取主机Ping状态def GetPing():    fp = open("unix_base.db",enCoding="utf-8")    count = len(open("unix_base.db",enCoding="utf-8").readlines())    print("-" * 100)    print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".format("IP地址","机器系统","设备SN","机房位置","存活状态","主机作用"))    print("-" * 100)    for each in range(count):        ref = eval(fp.readline())        ret = BatchCMD(ref[0],ref[5],ref[6],22,"pwd | echo $?")        if(int(ret)==0):            print("{0:20} \t {1:10} \t {2:11} \t {3:5} \t {4:9} \t {5:40}".                  format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))        else:            print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".                  format(ref[0],"异常",ref[4]))    fp.close()# ps aux | grep "usbCfgDev" | grep -v "grep" | awk {'print '}def GetProcessstatus():    fp = open("unix_process.db",enCoding="utf-8")    count = len(open("unix_process.db",enCoding="utf-8").readlines())    for each in range(count):        proc = eval(fp.readline())        proc_len = len(proc)        print("-" * 70)        print("---> 巡检地址: {0:10} \t 登录用户: {1:7} \t 登录密码: {2:10}".format(proc[0],proc[1],proc[2]))        print("-" * 70)        for process in range(3,proc_len):            command = "ps aux | grep \'{}\' | grep -v \'grep\' | awk '{}' | head -1".format(proc[process],"{print }")            try:                ref = BatchCMD(proc[0],proc[2],command)                if(int(ref)!=-1):                    print("进程: {0:18}  \t PID: {1:10} \t 状态: {2}".format(proc[process],int(ref),"√"))                else:                    print("进程: {0:18} \t  PID:{1:10} \t 状态: {2}".format(proc[process],"×"))            except Exception:                print("进程: {0:18} \t  PID:{1:10} \t 状态: {2}".format(proc[process],"×"))        print()    fp.close()def GetdiskStatus():    fp = open("unix_disk.db",enCoding="utf-8")    count = len(open("unix_disk.db",enCoding="utf-8").readlines())    for each in range(count):        proc = eval(fp.readline())        proc_len = len(proc)        print("-" * 100)        print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".              format(proc[0],proc[3]))        print("-" * 100)        try:            ref = BatchCMD(proc[0],proc[3],"df | grep -v 'filesystem'")            st = str(ref).replace("\n","\n")            print(st.replace("b'",""))        except Exception:            pass        print()    fp.close()# 运行命令def runcmd(command,system):    fp = open("unix_disk.db",enCoding="utf-8").readlines())    for each in range(count):        proc = eval(fp.readline())        proc_len = len(proc)        if proc[1] == system:            print("-" * 100)            print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".                  format(proc[0],proc[3]))            print("-" * 100)            try:                ref = BatchCMD(proc[0],command)                st = str(ref).replace("\n","\n")                print(st.replace("b'",""))            except Exception:                pass    fp.close()

面向对象的封装方法: 使用面向对象封装,可极大的提高复用性。

import paramikoclass MySSH:    def __init__(self,default_port = 22):        self.address = address        self.default_port = default_port        self.username = username        self.password = password        self.obj = paramiko.SSHClIEnt()        self.obj.set_missing_host_key_policy(paramiko.autoAddPolicy())        self.obj.connect(self.address,self.default_port,self.username,self.password)        self.obJsftp = self.obj.open_sftp()    def BatchCMD(self,command):        stdin,stderr = self.obj.exec_command(command)        result = stdout.read()        if len(result) != 0:            result = str(result).replace("\n","")            return result        else:            return None    def GetRemotefile(self,remotepath,localpath):        self.obJsftp.get(remotepath,localpath)    def PutLocalfile(self,localpath,remotepath):        self.obJsftp.put(localpath,remotepath)    def GetfileSize(self,file_path):        ref = self.BatchCMD("du -s " + file_path + " | awk '{print }'")        return ref    def CloseSSH(self):        self.obJsftp.close()        self.obj.close()if __name__ == '__main__':    ssh = MySSH('192.168.191.3','root','1233',22)    ref = ssh.BatchCMD("ifconfig")    print(ref)    sz = ssh.GetfileSize("/etc/passwd")    print(sz)    ssh.CloseSSH()

第二次封装完善。

import paramiko,os,Json,reclass MySSH:    def __init__(self,default_port = 22):        self.address = address        self.default_port = default_port        self.username = username        self.password = password        try:            self.obj = paramiko.SSHClIEnt()            self.obj.set_missing_host_key_policy(paramiko.autoAddPolicy())            self.obj.connect(self.address,self.password,timeout=3,allow_agent=False,look_for_keys=False)            self.obJsftp = self.obj.open_sftp()        except Exception:            pass    def BatchCMD(self,command):        try:            stdin,stderr = self.obj.exec_command(command,timeout=3)            result = stdout.read()            if len(result) != 0:                result = str(result).replace("\n","\n")                result = result.replace("b'","")                return result            else:                return None        except Exception:            return None    def GetRemotefile(self,remote_path,local_path):        try:            self.obJsftp.get(remote_path,local_path)            return True        except Exception:            return False    def PutLocalfile(self,remotepath):        try:            self.obJsftp.put(localpath,remotepath)            return True        except Exception:            return False    def CloseSSH(self):        self.obJsftp.close()        self.obj.close()    # 获取文件大小    def GetfileSize(self,file_path):        ref = self.BatchCMD("du -s " + file_path + " | awk '{print }'")        return ref.replace("\n","")    # 判断文件是否存在    def Isfile(self,file_path):        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

通过Eval函数解析执行: 自定义语法规则与函数,通过Eval函数实现解析执行. 没写完,仅供参考。

import Json,sys,mathimport argparse,time,reimport paramikossh = paramiko.SSHClIEnt()ssh.set_missing_host_key_policy(paramiko.autoAddPolicy())def BatchCMD(address,"")            return result        else:            return None    except Exception:        return None# ------------------------------------------------------------------------# 内置解析方法def Getdisk(x):    return str(x)def GetcpuLoad():    return str(10)# 句式解析器,解析句子并执行def judge(string):    # 如果匹配到IF则执行判断条件解析    if re.findall(r'IF{ (.*?) }',string,re.M) != []:        # 则继续提取出他的表达式        ptr = re.compile(r'IF[{] (.*?) [}]',re.S)        subject = re.findall(ptr,string)[0]        subject_List = subject.split(" ")        # 拼接语句并执行        sentence = eval(subject_List[0]) + subject_List[1] + subject_List[2]        # 组合后执行,返回结果        if eval(sentence):            return "IF"        else:            return False    # 如果匹配到put则执行上传解析    elif re.findall(r'PUT{ (.*?) }',re.M) != []:        print("put")    return False# 获取特定目录下所有的剧本def GetAllRule():    rootdir = os.getcwd() + "\rule\"    all_files = [f for f in os.Listdir(rootdir)]    print("-" * 90)    print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))    print("-" * 90)    for switch in all_files:        # 首先判断文件结尾是否为Json        if( switch.endswith(".Json") == True):            all_switch_dir = rootdir + switch            try:                # 判断文件内部是否符合JsON规范                with open(all_switch_dir,enCoding="utf-8") as read_file:                    # 判断是否存在指定字段来识别规范                    load = Json.loads(read_file.read())                    if load.get("framework") != None and load.get("task_sequence") != None:                        run_addr_count = len(load.get("address_List"))                        command_count = len(load.get("task_sequence"))                        print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".                              format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))            except ValueError:                pass# 指定一个剧本并运行def RunPlayBook(rule_name):    rootdir = os.getcwd() + "\rule\"    all_files = [f for f in os.Listdir(rootdir)]    for switch in all_files:        if( switch.endswith(".Json") == True):            all_switch_dir = rootdir + switch            # 寻找到需要加载的剧本地址            if( switch == rule_name):                with open(all_switch_dir,enCoding="utf-8") as read_file:                    data = Json.loads(read_file.read())                    address_List = data.get("address_List")                    # 循环每个主机任务                    for each in address_List:                        # 得到剧本内容                        task_sequence = data.get("task_sequence")                        default_port = data.get("default_port")                        print("-" * 90)                        print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))                        print("-" * 90)                        for task in task_sequence:                            flag = judge(task[0])                            if flag == "IF":                                ref = BatchCMD(each[0],each[2],default_port,task[1])                                print(ref)                            elif flag == False:                                ref = BatchCMD(each[0],task[0])                                print(ref)if __name__ == "__main__":    RunPlayBook("get_log.Json")

定义剧本规范如下。

{  "framework": "Centos","version": "7.0","address_List":  [    ["192.168.191.3","1233"]  ],"default_port": "22","task_sequence":  [    ["ifconfig"],["IF{ GetLastCmdFlag() == True }","uname"]  ]}

词法分析: 词法分析解析剧本内容。

# 获取特定目录下所有的剧本def GetAllRule():    rootdir = os.getcwd() + "\rule\"    all_files = [f for f in os.Listdir(rootdir)]    print("-" * 90)    print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称",command_count))            except ValueError:                pass# 句式解析器,string)[0]        subject_List = subject.split(" ")        # 公开接口,执行命令        ssh = MySSH("192.168.191.3","22")        # 组合命令并执行        sentence = str(eval(subject_List[0]) + subject_List[1] + subject_List[2])        if eval(sentence):            return "IF",ssh        else:            return False    # 如果匹配到put则执行上传解析    elif re.findall(r'PUT{ (.*?) }',re.M) != []:        print("put")    return False# 指定一个剧本并运行def RunPlayBook(rule_name):    rootdir = os.getcwd() + "\rule\"    all_files = [f for f in os.Listdir(rootdir)]    for switch in all_files:        if( switch.endswith(".Json") == True):            all_switch_dir = rootdir + switch            # 寻找到需要加载的剧本地址            if( switch == rule_name):                with open(all_switch_dir,each[2]))                        print("-" * 90)                        for task in task_sequence:                            flag,obj = judge(task[0])                            if flag == "IF":                                ret = obj.BatchCMD(task[1])                                print(ret)if __name__ == '__main__':    ret = judge("IF{ ssh.GetfileSize('/etc/passwd') >= 4 }")    print(ret)

MySSH类最终封装: 通过面向对象对其进行封装,实现了查询cpu,负载,内存利用率,磁盘容量,等通用数据的获取。

import paramiko,math,Jsonclass MySSH:    def __init__(self,default_port):        self.address = address        self.default_port = default_port        self.username = username        self.password = password    # 初始化,远程模块    def Init(self):        try:            self.ssh_obj = paramiko.SSHClIEnt()            self.ssh_obj.set_missing_host_key_policy(paramiko.autoAddPolicy())            self.ssh_obj.connect(self.address,look_for_keys=False)            self.sftp_obj = self.ssh_obj.open_sftp()        except Exception:            return False    # 执行非交互命令    def BatchCMD(self,stderr = self.ssh_obj.exec_command(command,"")                return result            else:                return None        except Exception:            return None    # 将远程文件下载到本地    def GetRemotefile(self,local_path):        try:            self.sftp_obj.get(remote_path,local_path)            return True        except Exception:            return False    # 将本地文件上传到远程    def PutLocalfile(self,remotepath):        try:            self.sftp_obj.put(localpath,remotepath)            return True        except Exception:            return False    # 关闭接口    def CloseSSH(self):        try:            self.sftp_obj.close()            self.ssh_obj.close()        except Exception:            pass    # 获取文件大小    def GetfileSize(self,file_path):        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))    # 获取系统型号    def GetSystemVersion(self):        return self.BatchCMD("uname")    # 检测目标主机存活状态    def GetPing(self):        try:            if self.GetSystemVersion() != None:                return True            else:                return False        except Exception:            return False    # 获取文件列表,并得到大小    def GetfileList(self,path):        try:            ref_List = []            self.sftp_obj.chdir(path)            file_List = self.sftp_obj.Listdir("./")            for sub_path in file_List:                dict = {}                file_size = self.GetfileSize(path + sub_path)                dict[path + sub_path] = file_size                ref_List.append(dict)            return ref_List        except Exception:            return False    # 将远程文件全部打包后拉取到本地    def GetTarPackageAll(self,path):        try:            file_List = self.sftp_obj.Listdir(path)            self.sftp_obj.chdir(path)            for packagename in file_List:                self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packagename))                self.sftp_obj.get("/tmp/{}.tar.gz".format(packagename),"./file/{}.tar.gz".format(packagename))                self.sftp_obj.remove("/tmp/{}.tar.gz".format(packagename))                return True        except Exception:            return True    # 获取磁盘空间并返回字典    def GetAlldiskSpace(self):        ref_dict = {}        cmd_dict = {"linux\n": "df | grep -v 'filesystem' | awk '{print  \":\" }'","AIX\n": "df | grep -v 'filesystem' | awk '{print  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    # 根据不同版本选择不同的命令                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    # 循环将其转换为字典                    for each in ref_List:                        # 判断最后是否为空,过滤最后一项                        if each != "":                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])            return ref_dict        except Exception:            return False    # 获取系统内存利用率    def GetAllMemSpace(self):        cmd_dict = {"linux\n": "cat /proc/meminfo | head -n 2 | awk '{print }' | xargs | awk '{print  \":\" }'","AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    # 根据不同版本选择不同的命令                    os_ref = self.BatchCMD(run_cmd)                    # 首先现将KB转化为MB                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n","")) / 1024)                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)                    # 计算占用空间百分比                    percentage = 100 - int(mem_free / int(mem_total / 100))                    # 拼接字典数据                    return {"Total": str(mem_total),"Free": str(mem_free),"Percentage": str(percentage)}        except Exception:            return False    # 获取系统进程信息,并返回字典格式    def GetAllProcessspace(self):        ref_dict = {}        cmd_dict = {"linux\n": "ps aux | grep -v 'USER' | awk '{print  \":\" }' | uniq","AIX\n": "ps aux | grep -v 'USER' | awk '{print  \":\" }' | uniq"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])            return ref_dict        except Exception:            return False    # 获取cpu利用率    def GetcpuPercentage(self):        ref_dict = {}        cmd_dict = {"linux\n": "vmstat | tail -n 1 | awk '{print  \":\"  \":\" }'","AIX\n": "vmstat | tail -n 1 | awk '{print  \":\"  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            each = each.split(":")                            ref_dict = {"us": each[0],"sys":each[1],"IDea":each[2]}            return ref_dict        except Exception:            return False    # 获取机器的负载情况    def GetLoadAVG(self):        ref_dict = {}        cmd_dict = {"linux\n": "uptime | awk '{print  \":\"  \":\" }'","AIX\n": "uptime | awk '{print  \":\"  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            each = each.replace(",","").split(":")                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}                            return ref_dict            return False        except Exception:            return False    # 获取当前系统端口状态数据,只支持linux    def GetAllPort(self):        ref_port_List = []        cmd_dict = {"linux\n": "netstat -antp | grep -vE '^[^tcp]' | grep -v '::' | awk '{print  \":\" }'","AIX\n": "uptime"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            dic = { "Address": each.split(":")[0],"Port": each.split(":")[1],"Status": each.split(":")[2] }                            ref_port_List.append(dic)                return ref_port_List            return False        except Exception:            return False    # 修改当前用户密码    def SetPasswd(self,password):        try:            os_ID = self.BatchCMD("ID | awk {'print '}")            print(os_ID)            if(os_ID == "uID=0(root)\n"):                self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))                return True        except Exception:            return False# 定义超类,集成基类MySSHclass SupeRSSH(MySSH):    def __init__(self,default_port):        super(SupeRSSH,self).__init__(address,default_port)

我们继续为上面的代码加上命令行,让其可以直接使用,这里需要遵循一定的格式规范,我们使用JsON解析数据,JsON格式如下.

{  "aix":  [    ["192.168.1.1","123123"],["192.168.1.1","2019"],],"suse":  [    ["192.168.1.1","centos":    [    ["192.168.1.1",]}

接着是主程序代码,如下所示.

# -*- Coding: utf-8 -*-from MySSH import MySSHimport Json,argparseclass InitJson():    def __init__(self,db):        self.db_name = db    def GetPlatform(self,plat):        with open(self.db_name,enCoding="utf-8") as Read_Pointer:            load_Json = Json.loads(Read_Pointer.read())            for k,v in load_Json.items():                try:                    if k == plat:                        return v                except Exception:                    return None        return Noneif __name__ == "__main__":    ptr = InitJson("database.Json")    parser = argparse.ArgumentParser()    parser.add_argument("-G","--group",dest="group",help="指定主机组")    parser.add_argument("-C","--cmd",dest="cmd",help="指定CMD命令")    parser.add_argument("--get",dest="get",help="指定获取数据类型<Ping>")    parser.add_argument("--dst",dest="dst_file",help="目标位置")    parser.add_argument("--src",dest="src_file",help="原文件路径")    args = parser.parse_args()    # 批量CMD --group=aix --cmd=ls    if args.group and args.cmd:        platform = ptr.GetPlatform(args.group)        success,error = [],[]        for each in platform:            ssh = MySSH(each[0],22)            if ssh.Init() != False:                print("-" * 140)                print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10} \t 命令: {3:30}".                      format(each[0],args.cmd))                print("-" * 140)                print(ssh.BatchCMD(args.cmd))                ssh.CloseSSH()                success.append(each[0])            else:                error.append(each[0])                ssh.CloseSSH()        print("\n\n","-" * 140,"\n 执行报告 \n","\n失败主机: {}\n".format(error),"-" * 140)    # 批量获取主机其他数据 --group=centos --get=Ping    if args.group and args.get:            platform = ptr.GetPlatform(args.group)            success,[]            for each in platform:                ssh = MySSH(each[0],22)                # 判断是否为Ping                if ssh.Init() != False:                    if args.get == "Ping":                        ret = ssh.GetPing()                        if ret == True:                            print("[*] 主机: {} 存活中.".format(each[0]))                    # 收集磁盘数据                    elif args.get == "disk":                        print("-" * 140)                        print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10}".                              format(each[0],each[2]))                        print("-" * 140)                        ret = ssh.GetAlldiskSpace()                        for k,v in ret.items():                            if (v.split("%")[0] != "-"):                                space_ret = int(v.split("%")[0])                                if space_ret >= 70:                                    print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k,v))                                    continue                                if space_ret >= 50:                                    print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k,v))                                    continue                                else:                                    print("磁盘分区: {0:30} \t 磁盘占用: {1:5}".format(k,v))                                    continue                        print()                else:                    error.append(each[0])                    ssh.CloseSSH()            print("\n\n","-" * 140)    # 实现文件上传过程 --group=centos --src=./a.txt --dst=/tmp/test.txt    if args.group and args.src_file and args.dst_file:        platform = ptr.GetPlatform(args.group)        success,22)            if ssh.Init() != False:                ret = ssh.PutLocalfile(args.src_file,args.dst_file)                if ret == True:                    print("主机: {} \t 本地文件: {} \t ---> 传到: {}".format(each[0],args.src_file,args.dst_file))            else:                error.append(each[0])                ssh.CloseSSH()        print("\n\n","-" * 140)
总结

以上是内存溢出为你收集整理的Python实现Paramiko的二次封装全部内容,希望文章能够帮你解决Python实现Paramiko的二次封装所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1157940.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-01
下一篇 2022-06-01

发表评论

登录后才能评论

评论列表(0条)

保存