') try: print('开始尝试:',mode) #post、delete、put if mode!='get': header_dict = {'Content-Type': 'application/json;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign} if mode=='post': if rest_type == '/oapi/v1/job/create': header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign} json_str=json2Params(data_json) url = url+'?'+json_str res = requests.post(url, data=str(data_json), headers=header_dict,verify=False) else: json_str=json2Params(data_json) url = url+'?'+json_str header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign} res = requests.post(url, data=str(data_json), headers=header_dict,verify=False) if mode=='put': res = requests.put(url, data=str(data_json), headers=header_dict,verify=False) if mode=='delete': res = requests.delete(url, data=str(data_json), headers=header_dict,verify=False) if mode=='get': header_dict = {'accesstoken':accesstoken,'timestamp':timestamp,'signature':sign} res = requests.get(url,headers=header_dict,verify=False) #获取返回值 res_text=res.text #转化成json get_field_json=json.loads(res_text) print('请求成功!') break except Exception as e: print('请求失败:',e) get_field_json={'code': 40, 'msg': 'fail,发送API失败!','result':None} return get_field_jsondef get_token(host='',port='',accessKey='',secretKey='',retry=2): ''' host:地址,str型,示例:'https://192.168.202.11' port:int型,https端口 accessKey:str型,服务平台应用AppKey secretKey:str型,服务平台应用AppSecret retry:int型,重试次数 ''' get_field_json={'code': 44, 'msg': 'fail,获取token失败','result':None} for i in range(retry): try: url = host if port != 443: url = url + ':' + str(port) json_str='accessKey='+accessKey+'&secretKey='+secretKey url = url+'/oapi/v1/token?'+json_str res = requests.get(url,verify=False) res_text=res.text #转化成json get_field_json=json.loads(res_text) print('获取token,第'+str(i+1)+'次,成功') break except Exception as e: print('获取token,第'+str(i+1)+'次,失败',e) time.sleep(1) return get_field_json def refresh_token(host='',port='',refleshtoken='',retry=2): ''' host:地址,str型,示例:'https://192.168.202.11' port:int型,https端口 refleshtoken:str型,刷新token retry:int型,重试次数 ''' get_field_json={'code': 44, 'msg': 'fail,刷新token失败','result':None} for i in range(retry): try: url = host if port != 443: url = url + ':' + str(port) json_str='refreshToken='+refleshtoken url = url+'/oapi/v1/token?'+json_str res = requests.get(url,verify=False) res_text=res.text #转化成json get_field_json=json.loads(res_text) print('刷新token,第'+str(i+1)+'次,成功') break except Exception as e: print('刷新token,第'+str(i+1)+'次,失败',e) time.sleep(1) return get_field_json4. 其它平台或客户端调用4.1 其它平台调用按照第 4 章的逻辑自行写调用代码即可。
4.2 机器人调用按照第 4 章添加一个全局函数,在需要调用的地方使用全局函数控件即可。
注:需要提前获取token后调用,如无第三方平台对接,获取的token可存在共享变量里,分配权限调用,参考下图:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)