我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用一些开源工具的sdk和api,或是运行一些耗时比较大的任务(单个大任务下可能有多个小任务),需要一段时间才能提供执行结果,而前端同事要求不能让用户在页面等待,需要马上提供一个返回结果给他,任务执行完后可以拿到最终结果,并且用户退出web界面或浏览器异常关闭之后,再次返回界面,执行的过程不会中断,并且支持多用户同时执行不同 *** 作的需要。
很明显,这是一个-异步多线程-的场景,在Python中可以想到的有:
1.引入Asyncio模块,利用多协程实现。
2.使用Threading模块,自己编写线程任务,线程等待,睡眠,释放线程的过程。
3.使用异步框架,例如Cerely、Tornado、Twisted等等,装饰异步任务。
这里边最便捷且开发效率最高的应该是使用异步框架,咱们选择使用Celery来实现这个需求。
Celery介绍:截图与描述来自celery官网:Celery - Distributed Task Queue — Celery 5.2.0 documentation
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为 *** 作提供维护此类系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
Celery 拥有庞大而多样化的用户和贡献者社区,您应该加入我们的 IRC 或我们的邮件列表。
Celery 是开源的,并在BSD 许可下获得许可。
消费者与消费结果:我们除了需要Celery做异步任务的处理,还需要一个中间件来充当消费者,并保存最终的任务处理结果(消费结果),这里有很多中间件可以选,例如常用的消息中间件,rabbitmq,kafka等,还可以使用mysql,redis等作为消费者并保存消费结果(因为最终的处理结果要返回给前端同事),楼主最终选择了redis。
Redis安装与配置:这里不再赘述windows下安装redis步骤,只介绍linux下安装redis与配置,我的机器是centos7.6:
yum方式安装(注意:这样安装的redis不是最新版本的,如有对版本要求比较高的,建议去官网下载源码包去手动安装,官网地址:Redis,最新版本:6.2.6)
yum -y install redis
安装完成之后配置redis.conf文件:
vi /etc/redis.conf
修改这一行,改成 0.0.0.0,这样别的应用和组件才可以访问到redis的服务与端口:
同理,redis的默认端口也可以在此配置里修改:
还有一些关闭匿名访问,设置密码等配置的修改,项目若要上到公网环境下,建议配置。
启动并测试redis服务功能是否正常:
启动redis:
redis-cli -h 0.0.0.0
测试redis:
1 redis> set name "zzz" 2 3 OK 4 5 redis> get name 6 7 "zzz"
记住,代码并没有实际引用redis,但也需要安装redis模块,否则会报错。(redis模块版本不要太高,高了也会报错,这些坑都是楼主亲自趟过的,我这里使用2.10.6)
pip install redis==2.10.6Celery的安装和配置:
windos和linux下都可以使用pip安装:
pip install celery==3.1.25
我的项目目录:(celeryconfig.py与__init__.py文件为celery与redis配置文件):
在项目中先创建一个名为config的python目录,并在__init__.py中导入celery模块并配置:
__init__.py:
from celery import Celery,platforms platforms.C_FORCE_ROOT = True app = Celery('prod') # 创建 Celery 实例 app.config_from_object('kernel.config.celeryconfig') # 通过 Celery 实例加载配置模块
platforms.C_FORCE_ROOT = True 这个配置一定要有,否则会报权限问题。
在config目录下的celeryconfig.py中配置任务队列消费者与消费结果保存在redis的地址:
celeryconfig.py:
## celery配置 BROKER_URL = 'redis://redis-host:6379/1' # 指定 Broker消费者,我们使用redis 1号数据库 CELERY_RESULT_BACKEND = 'redis://redis-host:6379/2' # 指定 Backend,最终消费结果,我们使用redis 2号数据库 CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC CELERY_importS = ( # 指定导入的任务模块 'kernel.views.api' ## 异步任务代码文件路径即可 )
至此,前期需要的工具准备工作全部完毕,我们开始我们的开发任务。
异步任务开发:楼主因为主要负责后端这块,这里选择使用flask来写,整体的项目模块与版本,大概罗列下:
Python 3.5.4
Mysql 5.5.64
Celery==3.1.25
Flask==1.1.4
Redis==2.10.6
这时我们与前端同事再次详细沟通了下,初步约定如下:
1.前端通过form表单传数据给后端,格式为json,分析:需要解析json数据。
2.因为存在长耗时的任务,要求一旦前端请求过来,后端要马上返回一个中间结果给前端(这样解决了前端页面等待的问题),分析:需要马上提供一个返回结果。
3.前端最终要拿到任务的最终执行结果,分析:我们需要把长耗时异步任务的最终结果推送给前端,需要任务代码最后推送执行结果。(自己先定义回调接口去测试)
1.后端Flask接口代码: 文件名称与路径:项目名称-kernel-view-api.py,与celery配置下的任务模块对应。
api.py:
# -*- coding: utf-8 -*- import json, sys import logging import requests import datetime,pymysql import os,subprocess from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response from kernel.models.playbook import PlayBook_file from kernel.utils import render_response, Retval from kernel.models import db from sqlalchemy import or_,text import gitlab ## 导入gitlab模块 from kernel.config import app, cmdb_config,hcacp_config import pymysql,uuid,hashlib,time from datetime import timezone bp = Blueprint('test', __name__) ## 蓝图自己定义,这里只是实例化 log = logging.getLogger(__name__) ## 日志自己定义,这里只是实例化 class status: ## 定义一些状态码 success = 0 warning = 1 pending = 2 faild = -1 ## 回调接口 @bp.route('/test/callback/', methods=['GET', 'POST']) def ansible_aaa(): data1 = request.get_data(as_text=True) # data2 = json.loads(data1) log.info(data1) return data1 @bp.route('/test/add/', methods=['POST', 'GET']) def devops_add(): ''' 获取form表单json数据 ''' # return True try: data = request.get_data() _data = json.loads((str(data, 'utf-8'))) print(_data) except Exception as requestdata_except: log.error('获取表单数据异常,异常原因:%s' % requestdata_except) return render_response(status.faild, u"获取表单数据异常,异常原因:%s" % requestdata_except, {}) ## 获取标识tag的结果 try: ''' 工单json数据要带工单标识符select_tag: create_project:新建项目申请工单 ''' select_tag = _data.get('select_tag') except Exception as request_select_tag_except: log.error('获取表单需求标识select_tag异常,异常原因:%s' % request_select_tag_except) return render_response(status.faild, u"获取表单需求标识select_tag异常,异常原因:%s" % request_select_tag_except, {}) try: """ !--当参数select_tag == create_project 时,建立项目--! """ if select_tag == 'create_project': projname = _data.get('projname') add_project_result = add_project.delay(cmdb_config, _data) return render_response(status.pending, u"devops系统添加项目工单任务执行中--pending--", {'项目中文名称': projname}) except Exception as do_celery_job_except: log.error('执行异步celery任务异常,异常原因:%s' % do_celery_job_except) return render_response(status.faild, u"执行异步celery任务异常,异常原因:%s" % do_celery_job_except, {})
这里代表前端请求过来之后,马上返回一个执行结果,满足需求2:
在devops_add接口里执行异步任务:
add_project_result = add_project.delay(cmdb_config, _data)
官网的示例:
## 1.扩号里为异步任务所需的参数
## 2.add_project_result 是异步任务执行的对象,包含很多属性方法,下边介绍一些常用的:
获取任务结果和状态:
add_project_result = task.apply_async()
add_project_result.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
add_project_result.wait() # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
add_project_result.get(timeout=1) # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
add_project_result.result # 任务执行结果,未完成返回None;
add_project_result.state # PENDING, START, SUCCESS,任务当前的状态
add_project_result.status # PENDING, START, SUCCESS,任务当前的状态
add_project_result.successful # 任务成功返回true
add_project_result.traceback # 如果任务抛出了一个异常,可以获取原始的回溯信息
2.异步任务代码: 文件名称与路径:
项目名称-kernel-view-api.py
api.py
解释:
因为要满足需求3,把最终异步耗时任务的真正结果给到前端,所以我们需要在异步任务里写一个回调的 *** 作。
header = {'Content-Type': 'application/json'} ## 构造请求头和数据类型
_json = {"status": sttaus.faild, "msg": u"失败", "data": {}} ## 失败就返回给前端json类型失败
_json = {"status": sttaus.success, "msg": u"成功", "data": {}} ## 成功就返回给前端json类型成功
requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 带参回调请求
# -*- coding: utf-8 -*- import json, sys import logging import requests import datetime,pymysql import os,subprocess from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response from kernel.utils import render_response, Retval from datetime import timezone from kernel.config import * ## 导入config目录下的celery配置 bp = Blueprint('test', __name__) ## 蓝图自己定义,这里只是实例化 log = logging.getLogger(__name__) ## 日志自己定义,这里只是实例化 class status: ## 定义一些状态码 success = 0 warning = 1 pending = 2 faild = -1 ## 示例函数:一个添加信息函数,前端给我们json数据,后端接受之后去插入数据库,完成 *** 作并告诉前端 @app.task ## celery添加项目任务 def add_project(mysql_config, _data): try: ## 系统添加项目信息工单 projname = _data.get('projname') ## 项目名称,必填 prodesc= _data.get('prodesc') ## 项目描述,必填 projctime = datetime.datetime.now() ## 项目发布时间 callback_url = _data.get('callback_url') ## 回调接口地址 except Exception as describe_form_except: log.error('解析表单数据出现异常,异常原因:%s' % describe_form_except) header = {'Content-Type': 'application/json'} ## 回调接口请求头 _json = {"status": status.faild, "msg": u"失败", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) try: # 获取数据库连接 conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db) # 返回连接 cursor = conn.cursor() except Exception as connect_except: log.error('系统数据库连接出现异常,异常原因:%s' % connect_except) _json = {"status": status.faild, "msg": u"失败", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) try: proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ('{}','{}','{}');".format(projname, prodesc, projctime) cursor.execute(proj_sql) conn.commit() _json = {"status": status.success, "msg": u"成功", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 任务执行完成之后调用回调接口,返回任务执行成功结果 log.info('系统建项目工单执行成功,%s' % proj_sql) except Exception as do_add_project_except: _json = {"status": status.faild, "msg": u"失败", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) log.error('执行添加项目工单异常,异常原因:%s' % do_add_project_except) ## 任务执行完成之后调用回调接口,返回任务执行失败结果
楼主用的最简单,没有在task里写一些属性,类似下边的这种方式还可以给task添加一些属性:
@app.task(name='test',bind=True,base=baseTask)
补充介绍下异步task有的一些属性:
TASK的一般属性:
Task.name:任务名称;
Task.request:当前任务的信息;
Task.max_retries:设置重试的最大次数
Task.throws:预期错误类的可选元组,不应被视为实际错误,而是结果失败;
Task.rate_limit:设置此任务类型的速率限制
Task.time_limit:此任务的硬限时(以秒为单位)。
Task.ignore_result:不存储任务状态。默认False;
Task.store_errors_even_if_ignored:如果True,即使任务配置为忽略结果,也会存储错误。
Task.serializer:标识要使用的默认序列化方法的字符串。
Task.compression:标识要使用的默认压缩方案的字符串。默认为task_compression设置。
Task.backend:指定该任务的结果存储后端用于此任务。
Task.acks_late:如果设置True为此任务的消息将在任务执行后确认 ,而不是在执行任务之前(默认行为),即默认任务执行之前就会发送确认;
Task.track_started:如果True任务在工作人员执行任务时将其状态报告为“已启动”。默认是False;
我们启动celery来看下celery里在执行任务的过程中有什么变化:
(1)启动项目:
楼主用的是gunicorn工具启动,配置多线程:
gunicorn.conf
workers = 16 ## 多线程配置
bind = '0.0.0.0:7777'
proc_name = 'websocket(项目名称)'
limit_request_field_size = 0
limit_request_line = 0
log_level = 'error'
debug = True
chdir = '/data/websocket' ## 项目目录
启动命令:gunicorn -c /项目目录/gunicorn.conf kernel:app
(2)启动celery:
cd 到项目目录下,执行 celery -A kernel.views.api worker -l info
(3)使用postman调用接口:
可以看到直接先返回我们状态码2-等待状态:
(4)从日志看异步任务执行过程:
1.会先在celery里出现一个异步任务,并生成一个异步任务的task-id号:
2.redis去查看是否已有task任务,task-id号是一致的:
用add_project_result保存异步任务执行结果的对象,最终的结果是在redis中,我们也可以去redis里去拿,redis保存的结果。
我们用的redis 2号数据库,select 2 号数据库,keys * 查看redis是否已有任务
任务最终的执行结果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直观,succeded代表异步任务执行成功):
3. 查看项目日志,状态码为1,是回调接口打印出来的,代表返回给回调接口最终结果是成功。
4.最终去数据库看下新添加记录是否已有,这里就不截图了,记录插入成功,异步任务执行成功,也满足了开始我们沟通的三个需求。
5.前端同学给你竖起了大拇指,直呼你牛!
备注:
celery还可以用来做定时任务,感兴趣的伙伴们可以去官网或者其他途径去研究下,楼主第一次写这么大的博客,有些地方我描述不清楚的或者您没太看懂的可以私信我答疑解惑,我的微信zcw576020095,热爱python,热爱运维,一起加油!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)