Python 运用Paramiko实现批量巡检

Python 运用Paramiko实现批量巡检,第1张

概述通过封装Paramiko这个SSH模块,我们可以实现远程批量管理Linux主机,在此基础上配合钉钉API接口可实现自动告警机制,定期自动检查设备状态,并推送到钉钉群内。 首先需要配置双网卡模式,我们将

通过封装Paramiko这个SSH模块,我们可以实现远程批量管理linux主机,在此基础上配合钉钉API接口可实现自动告警机制,定期自动检查设备状态,并推送到钉钉群内。

首先需要配置双网卡模式,我们将无线网卡配置路由让其走外网与钉钉连接,有线网口则负责与内部服务器相连接,只需要配置路由即可实现。

网络目标        网络掩码          网关       接口   跃点数0.0.0.0          0.0.0.0      132.35.93.1     132.35.93.11     210.0.0.0          0.0.0.0    192.168.191.1    192.168.191.3     25 C:\windows\system32> route delete 0.0.0.0# 所有的外网访问走无线网卡,从网关 192.168.191.1 出去C:\windows\system32>route -p add 0.0.0.0 mask 0.0.0.0 192.168.191.1# 如果是132网段,则走内部,网关为:132.35.93.1C:\windows\system32>route -p add 132.35.0.0 mask 255.255.0.0 132.35.93.1

封装钉钉接口: 接口的调用需要传入需要通知特定人的手机号,这个模块命名为Ding.py 代码如下。

import requestsimport urllib.parseimport datetime,time,hmac,hashlib,base64,Jsonclass DingToken():    def __init__(self,atAll,atMobiles):        self.atAll = atAll        self.atMobiles = atMobiles    def send_message(self,message):        timestamp = str(round(time.time() * 1000))        secret = 'SEC1018485caf7339e38530b'        secret_enc = secret.encode('utf-8')        string_to_sign = '{}\n{}'.format(timestamp,secret)        string_to_sign_enc = string_to_sign.encode('utf-8')        hmac_code = hmac.new(secret_enc,string_to_sign_enc,digestmod=hashlib.sha256).digest()        sign = urllib.parse.quote(base64.b64encode(hmac_code))        headers={'Content-Type': 'application/Json'}        webhook = 'https://oAPI.dingtalk.com/robot/send?access_token=0fe10f&timestamp=' + timestamp + "&sign=" + sign        data = {            "msgtype": "text","text": {"content": message },"at": {                "atMobiles": [ self.atMobiles ],"isAtAll": self.atAll                }            }        requests.post(webhook,data=Json.dumps(data),headers=headers)    # 发送警告信息    def send_warning(self,platform,person,group,address,send_date,type,message):        self.send_message(                    "------------------------------------------------------- \n"                    "\t\t\t\t\t {0} \n"                    "------------------------------------------------------- \n"                    "维护人员: \t {1} \n"                    "所在分组: \t {2} \n"                    "系统地址: \t {3} \n"                    "告警日期: \t {4} \n"                    "告警类型: \t {5} \n"                    "------------------------------------------------------- \n"                    "{6} \n"                    "-------------------------------------------------------".                        format(platform,message) )    # 发送Ping连通性报告    def send_Ping(self,success_len,error_len,error_List):        self.send_message(            "------------------------------------------------------- \n"            "\t\t\t\t\t {0} \n"            "------------------------------------------------------- \n"            "[*] 日期: \t {1} \n[+] 连通主机数: \t {2} \n[-] 失败主机数: \t {3} \n"            "------------------------------------------------------- \n"            "失败主机列表: \n {4} \n".format(platform,error_List)        )if __name__ == "__main__":    ding = DingToken(False,"15646596977")    ding.send_warning("总部客服","王瑞","CTI服务组","192.168.1.1","2021:01:01","磁盘异常","C:// \t 100% \n")


封装好的MySSH模块: 接着就是封装一个拉取数据到本地的SSH模块,这个模块命名为 MySSH.py 代码如下。

import paramiko,math,Jsonclass MySSH:    def __init__(self,username,password,default_port):        self.address = address        self.default_port = default_port        self.username = username        self.password = password    def Init(self):        try:            self.ssh_obj = paramiko.SSHClIEnt()            self.ssh_obj.set_missing_host_key_policy(paramiko.autoAddPolicy())            self.ssh_obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=False,look_for_keys=False)            self.sftp_obj = self.ssh_obj.open_sftp()        except Exception:            return False    def BatchCMD(self,command):        try:            stdin,stdout,stderr = self.ssh_obj.exec_command(command,timeout=3)            result = stdout.read()            if len(result) != 0:                result = str(result).replace("\n","\n")                result = result.replace("b'","").replace("'","")                return result            else:                return None        except Exception:            return None    def CloseSSH(self):        try:            self.sftp_obj.close()            self.ssh_obj.close()        except Exception:            pass    def GetSystemVersion(self):        return self.BatchCMD("uname")    # 测试主机连通率    def GetPing(self):        try:            if self.GetSystemVersion() != None:                print("{} 已连通.".format(self.address))                return True            else:                return False        except Exception:            return False    # 拉取磁盘数据到本地,并返回字典    def GetAlldiskSpace(self):        ref_dict = {}        cmd_dict = {"linux\n": "df | grep -v 'filesystem' | awk '{print  \":\" }'","AIX\n": "df | grep -v 'filesystem' | awk '{print  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])            print("利用率字典: {}".format(ref_dict))            return ref_dict        except Exception:            return False    # 拉取内存数据到本地。    def GetAllMemSpace(self):        cmd_dict = {"linux\n": "cat /proc/meminfo | head -n 2 | awk '{print }' | xargs | awk '{print  \":\" }'","AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n","")) / 1024)                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)                    percentage = 100 - int(mem_free / int(mem_total / 100))                    print("利用百分比: {}  \t 总内存: {}  \t 剩余内存: {}".format(percentage,mem_total,mem_free))                    return str(percentage) + " %"        except Exception:            return False    # 获取cpu利用率数据    def GetcpuPercentage(self):        ref_dict = {}        cmd_dict = {"linux\n": "vmstat | tail -n 1 | awk '{print  \":\"  \":\" }'","AIX\n": "vmstat | tail -n 1 | awk '{print  \":\"  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            each = each.split(":")                            ref_dict = {"us": each[0],"sys": each[1],"IDea": each[2]}            print("cpu利用率数据: {}".format(ref_dict))            return ref_dict        except Exception:            return False    # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载    def GetLoadAVG(self):        ref_dict = {}        cmd_dict = {"linux\n": "cat /proc/loadavg | awk '{print  \":\"  \":\" }'","AIX\n": "uptime | awk '{print  \":\"  \":\" }'"                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ref_List = os_ref.split("\n")                    for each in ref_List:                        if each != "":                            each = each.replace(",","").split(":")                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}                            print("负载利用率: {}".format(ref_dict))                            return ref_dict            return False        except Exception:            return False    # 检测指定进程是否存活    def CheckProcessstatus(self,processname):        cmd_dict = {"linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print '}"),"AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print '}")                    }        try:            os_version = self.GetSystemVersion()            for version,run_cmd in cmd_dict.items():                if (version == os_version):                    os_ref = self.BatchCMD(run_cmd)                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())                    if ret_flag != "0":                        return True            return False        except Exception:            return "None"    # 检测指定端口是否存活    def CheckPortStatus(self,port):        cmd_dict = {"linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print '}"),"AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print '}")                    }        try:            os_version = self.GetSystemVersion()            for version,"").strip())                    if ret_flag == "ListEN" or ret_flag == "ESTABliSHED":                        return True            return False        except Exception:            return False

定义巡检过程: 这个模块是最重要的一个模块,主要负责解析JsON文件并巡检,该模块我们就命名为system.py,代码如下:

from Ding import DingTokenfrom MySSH import MySSHimport os,sys,datetime,Json# --------------------------------------------------------------------------------------------------# 对所有主机设备进行def switch_Ping():    with open("./config.Json","r",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        system = ptr.get("system")        success,error = [],[]        try:            for system_List in system:                for k,v in system_List.items():                    for item in v:                        Now = datetime.datetime.Now()                        print("[+] 监控范围: Ping测试 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(Now,'%Y-%m-%d %H:%M:%s'),item[0]))                        #Inspectcpu(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit"))                        ssh = MySSH(item[0],ptr.get("default-port"))                        ssh.Init()                        ref = ssh.GetPing()                        if ref == True:                            success.append(item[0])                        else:                            error.append(item[0])            ding = DingToken(False,ptr.get("telephone"))            ding.send_Ping(ptr.get("platform"),len(success),len(error),error)        except Exception:            pass# --------------------------------------------------------------------------------------------------# 磁盘告警流程# 手机号 地址 账号 密码 端口 系统名称 姓名 分组 磁盘最大值def Inspectdisk(telephone,port,disk_limit):    ding = DingToken(False,telephone)    ssh = MySSH(address,port)    ssh.Init()    ret = ssh.GetAlldiskSpace()    ssh.CloseSSH()    if ret != False:        for k,v in ret.items():            try:                space = eval(v.split("%")[0])                if space >= int(disk_limit):                    Now = datetime.datetime.Now()                    strNow = datetime.datetime.strftime(Now,'%Y-%m-%d %H:%M:%s')                    ding.send_warning(platform,strNow,"磁盘过载","[ 分区: {} \t | \t 负载率: {} ]\n".format(k,v))            except Exception:                passdef switch_inspect_disk():    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        system = ptr.get("system")        for system_List in system:            # 循环所有字典            for k,v in system_List.items():                # 循环每个分组内的主机                for item in v:                    Now = datetime.datetime.Now()                    print("[+] 监控范围: 磁盘检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))                    # Inspectdisk("15646596977","192.168.191.4","root","1233","22","总部客服(呼叫中心平台)","CTI 3.4",80)                    Inspectdisk(ptr.get("telephone"),ptr.get("disk_limit"))# --------------------------------------------------------------------------------------------------# 内存告警流程# InspectMemory("15646596977","192.168.191.3",22,"总部客服系统","CTI组",内存阈值)def InspectMemory(telephone,memory_limit):    ding = DingToken(False,port)    ssh.Init()    ret = ssh.GetAllMemSpace()    ssh.CloseSSH()    if ret != False:        try:            space = eval(ret.split("%")[0])            if space >= int(memory_limit):                Now = datetime.datetime.Now()                strNow = datetime.datetime.strftime(Now,'%Y-%m-%d %H:%M:%s')                ding.send_warning(platform,"内存过载","[ 负载率: \t | \t {} ]\n".format(ret))        except Exception:            passdef switch_inspect_memory():    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        system = ptr.get("system")        for system_List in system:            for k,v in system_List.items():                for item in v:                    Now = datetime.datetime.Now()                    print("[+] 监控范围: 内存检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))                    InspectMemory(ptr.get("telephone"),ptr.get("memory_limit"))# --------------------------------------------------------------------------------------------------# 巡检cpu利用率def Inspectcpu(telephone,cpu_limit):    ding = DingToken(False,port)    ssh.Init()    ret = ssh.GetcpuPercentage()    ssh.CloseSSH()    if ret != False:        try:            space = int(100 - int(ret.get("IDea")))            us = ret.get("us") + "%"            sys = ret.get("sys") + "%"            if space >= int(cpu_limit):                print("cpu 总利用率: {}".format(space))                Now = datetime.datetime.Now()                strNow = datetime.datetime.strftime(Now,"cpu 利用率过载","[ 用户态: \t | \t {} ]\n[ 内核态: \t | \t {} ]\n[ 总利用率: \t | \t {}% ]\n".format(us,space))        except Exception:            passdef switch_inspect_cpu():    with open("./config.Json",v in system_List.items():                for item in v:                    Now = datetime.datetime.Now()                    print("[+] 监控范围: cpu检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))                    Inspectcpu(ptr.get("telephone"),ptr.get("cpu_limit"))# --------------------------------------------------------------------------------------------------# 巡检系统平均负载def InspectLoadAvg(telephone,load_avg_limit):    ding = DingToken(False,port)    ssh.Init()    ret = ssh.GetLoadAVG()    ssh.CloseSSH()    if ret != False:        try:            if float(ret.get("1avg")) >= float(load_avg_limit[0]) or float(ret.get("5avg")) >= float(load_avg_limit[1]) or float(ret.get("15avg")) >= float(load_avg_limit[2]):                Now = datetime.datetime.Now()                strNow = datetime.datetime.strftime(Now,"LoadAvg 过载","[ 一分钟负载: \t | \t {} ]\n[ 五分钟负载: \t | \t {} ]\n[ 十五分钟负载: \t | \t {} ]\n".                                  format(ret.get("1avg"),ret.get("5avg"),ret.get("15avg")))        except Exception:            passdef switch_inspect_avg():    with open("./config.Json",v in system_List.items():                for item in v:                    Now = datetime.datetime.Now()                    print("[+] 监控范围: 系统负载检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,item[0]))                    InspectLoadAvg(ptr.get("telephone"),List(ptr.get("load_avg_limit")))# --------------------------------------------------------------------------------------------------# 巡检系统端口是否开放# 首先传入一个IP地址,解析出其用户名密码表def GetAddressAsPasswd(address):    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        for each_List in ptr.get("system"):            for k,v in each_List.items():                for ser in v:                    if ser[0] == address:                        return ser    return False# 拼接字符串数据def append_string(src_str,append_str):    for x in range(len(append_str)):        src_str += append_str[x]    return src_str# 检查端口是否被关闭了,如果关闭了则提示关闭def InspectPort(telephone,person):    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        port = ptr.get("port")        default_port = ptr.get("default-port")        for each_List in port:            for k,v in each_List.items():                for item in v:                    this_user_passwd = GetAddressAsPasswd(item[0])                    if this_user_passwd != False:                        try:                            Now = datetime.datetime.Now()                            print("[+] 监控范围: 端口状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(                                                                                                      Now,item[0]))                            ssh = MySSH(this_user_passwd[0],this_user_passwd[1],this_user_passwd[2],default_port)                            ssh.Init()                            close_port_List = []                            for check_port in range(1,len(item)):                                ref = ssh.CheckPortStatus(item[check_port])                                print("端口ID: {} \t 状态位: {} 存活端口: {}".format(check_port,ref,item[check_port]))                                if ref == False or ref == "None":                                    close_port_List.append(item[check_port])                                    print("端口ID: {} \t 状态位: {} 关闭端口: {}".format(check_port,item[check_port]))                            ding = DingToken(False,telephone)                            send = ""                            for i in close_port_List:                                im = "[ 端口: {0} \t\t | \t 状态: 已关闭 ]\n".format(i)                                send = append_string(send,im)                            if(send != ""):                                ding.send_warning(platform,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.Now(),"端口关闭",send)                            ssh.CloseSSH()                        except Exception:                            passdef switch_inspect_port():    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        telephone = ptr.get("telephone")        platform = ptr.get("platform")        person = ptr.get("person")        InspectPort(telephone,person)# --------------------------------------------------------------------------------------------------# 巡检系统是否开放指定进程def InspectProcess(telephone,enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        process = ptr.get("process")        default_port = ptr.get("default-port")        for each_List in process:            for k,v in each_List.items():                for item in v:                    this_user_passwd = GetAddressAsPasswd(item[0])                    if this_user_passwd != False:                        try:                            Now = datetime.datetime.Now()                            print("[+] 监控范围: 进程状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,default_port)                            ssh.Init()                            close_process_List = []                            for check_process in range(1,len(item)):                                ref = ssh.CheckProcessstatus(item[check_process])                                print("进程ID: {} \t 状态位: {} 存活进程: {}".format(check_process,item[check_process]))                                if ref == False or ref == "None":                                    close_process_List.append(item[check_process])                                    print("进程ID: {} \t 状态位: {} 关闭进程: {}".format(check_process,item[check_process]))                            ding = DingToken(False,telephone)                            send = ""                            for i in close_process_List:                                im = "[ 进程: {0} \t\t | \t 状态: 已关闭 ]\n".format(i)                                send = append_string(send,"进程退出",send)                            ssh.CloseSSH()                        except Exception:                            passdef switch_inspect_process():    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        telephone = ptr.get("telephone")        platform = ptr.get("platform")        person = ptr.get("person")        InspectProcess(telephone,person)

以磁盘为例,当超过了我们定义好的阈值时,则会通过钉钉提示用户告警,告警提示如下。

定义配置文件: 配置文件则是巡检时需要解析的内容,我们需要依次写入账号密码等信息。

{    "system_config":    [        {"Ping_": "True"},{"disk_": "True"},{"cpu_": "False"},{"loadavg_": "False"},{"memory_": "False"},{"process_": "False"},{"port_": "False"}    ],"platform": "总部客服系统(呼叫中心)","person": "王瑞","telephone": "18264825669","e-mail": "admin@lyshark.com","default-port": "22","set-timeout": "3600","disk_limit": 75,"memory_limit": 95,"cpu_limit": 90,"load_avg_limit": ["5.0","8.0","10.0"],"system":    [        {        "Centos 服务器组":            [                ["192.168.1.1","1233"],["192.168.1.1","1233"]            ],"AIX 服务器组":            [                ["192.168.1.1","1233"]            ]        }    ],"process":    [        {        "CTI进程组":            [                ["192.168.1.1","icdcomm","aplogic","ctilink","cnfgsvr","mcp"],"mcp","ccsapp","nis","oas","ivr","ctiserver"]            ]        }    ],"port":    [        {        "CTI端口检测":            [                ["192.168.1.1","111","631","8888","19001","25","2556","5150","5600","34902","10000","34901","10005"],"8080","10004","10005","60661"]            ]        }    ]}

定义main入口代码: 入口代码主要负责解析参数与巡检,由于怕影响服务器性能,所有没加多线程支持,不过也够用了。

import systemimport Json,timeif __name__ == "__main__":    with open("./config.Json",enCoding="utf-8") as read_config_ptr:        ptr = Json.loads(read_config_ptr.read())        timeout = ptr.get("set-timeout")        configure = ptr.get("system_config")        # 默认开关全部关闭        disk_ = "False"        cpu_ = "False"        memory_ = "False"        loadavg_ = "False"        process_ = "False"        port_ = "False"        Ping_ = "False"        # 判断配置文件开关是否开启,如果开启了则将内存的开关开启        for dic in configure:            for k,v in dic.items():                if k == "Ping_" and v == "True":                    Ping_ = "True"                if k == "disk_" and v == "True":                    disk_ = "True"                if k == "cpu_" and v == "True":                    cpu_ = "True"                if k == "memory_" and v == "True":                    memory_ = "True"                if k == "loadavg_" and v == "True":                    loadavg_ = "True"                if k == "process_" and v == "True":                    process_ = "True"                if k == "port_" and v == "True":                    port_ = "True"        while True:            if Ping_ == "True":                system.switch_Ping()            if disk_ == "True":                system.switch_inspect_disk()            if cpu_ == "True":                system.switch_inspect_cpu()            if memory_ == "True":                system.switch_inspect_memory()            if loadavg_ == "True":                system.switch_inspect_avg()            if process_ == "True":                system.switch_inspect_process()            if port_ == "True":                system.switch_inspect_port()            time.sleep(int(timeout))

实现正则解析: 前面的配置无法实现交互式问答,智能机器人推送数据,很被动,我们需要开启交互机器人,自己实现业务逻辑。

import os,redef participle(text):    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)    address = ".".join(ref[0])    select = re.findall('查询',text)    if(len(address) !=0 and len(select) != 0):        if len (re.findall('内存',text)) != 0:            return [address,"内存"]        elif len (re.findall('磁盘',"磁盘"]        elif len (re.findall('负载',"负载"]if __name__ == "__main__":    text = "帮我查询192.168.1.1主机的内存负载情况"    text2 = "帮我查询 192.168.1.200 这 台设 备的 磁盘 使用率"    text3 = "需要查询192.168.1.200 设备中负载使用情况,请反馈结果"    text4 = "发现负载出现告警,请立即查询192.168.1.200这台设备的负载使用率,并立即反馈"    print(participle(text))    print(participle(text2))    print(participle(text3))    print(participle(text4))    txt = '''    主编单位:总部客服系统    '''    addr = re.findall('(主编单位:.*?)\s',txt)[0]    print(addr)

这个案例,从文本中提取可用信息,并将其返回为列表格式。

接着配置钉钉开发者平台机器人,此处需要有公网地址,作用是,钉钉群有人at机器人时,机器人会将请求post发送到我们的django应用服务上。

使用django配置接收请求并处理即可,处理代码如下所示。

from django.http import httpResponse,JsonResponseimport Json,base64import sys,os,re# 机器人 app_secretapp_secret = "LcKEf0nlqe"# 根据IP查询 *** 作使用def select_participle(text):    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)    # 判断如果地址不为空,则进行查询等 *** 作    if len(ref) != 0:        address = ".".join(ref[0])        select = re.findall('查询',text)        if(len(address) !=0 and len(select) != 0):            if len (re.findall('内存',text)) != 0:                return [address,"内存"]            elif len (re.findall('负载',"负载"]    else:        if len(re.findall('主机列表',text)) != 0:            return ["主机列表"]def index(request):    if request.method == "POST":        http_SIGN = request.Meta.get("http_SIGN")        http_TIMESTAMP = request.Meta.get("http_TIMESTAMP")        res = Json.loads(request.body)        # 接收用户的输入并做处理        sendnick = res.get("senderNick")        is_admin = res.get("isadmin")        conver = res.get("conversationTitle")        content = res.get("text").get("content")        print("发送人: {} 是否管理员: {} 所在组: {} 接收数据: {}".format(sendnick,is_admin,conver,content))        # 数据的加密解密        string_to_sign = '{}\n{}'.format(http_TIMESTAMP,app_secret)        string_to_sign_enc = string_to_sign.encode('utf-8')        hmac_code = hmac.new(app_secret.encode("utf-8"),digestmod=hashlib.sha256).digest()        sign = base64.b64encode(hmac_code).decode("utf-8")        ref_select_text = select_participle(content)        if len(ref_select_text) == 2:            if sign == http_SIGN:                if "负载" in ref_select_text[1]:                    return JsonResponse(                    {"msgtype": "text","text": {                             "content": "经过查询,主机: {} 负载率为: 10% 当前为正常状态".format(ref_select_text[0])                         }                    })                if "内存" in ref_select_text[1]:                    return JsonResponse\                    (                        {"msgtype": "text","text":                            {                             "content": "经过查询,主机: {} 内存占用率为: 70% 当前为正常状态".format(ref_select_text[0])                            }                         }                    )        elif len(ref_select_text) == 1:            if "主机列表" in ref_select_text[0]:                return JsonResponse \                        (                        {"msgtype": "text","text":                             {                                 "content": "目前您所管理的主机,总结: 205台"                             }                         }                    )        return JsonResponse({"error": "没有权限"})    if request.method == "GET":        return httpResponse("没有权限")

django运行后,我们回到钉钉群内,问机器人一些问题看看吧,前提是这些问题已经定义了正则解析。

django 后台则能够接收到,这些数据,并做出回应。

总结

以上是内存溢出为你收集整理的Python 运用Paramiko实现批量巡检全部内容,希望文章能够帮你解决Python 运用Paramiko实现批量巡检所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1157907.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-01
下一篇 2022-06-01

发表评论

登录后才能评论

评论列表(0条)

保存