Flask+Celery+Redis实现队列化异步任务

Flask+Celery+Redis实现队列化异步任务,第1张

Flask+Celery+Redis实现队列化异步任务 概述:

        我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用一些开源工具的sdk和api,或是运行一些耗时比较大的任务(单个大任务下可能有多个小任务),需要一段时间才能提供执行结果,而前端同事要求不能让用户在页面等待,需要马上提供一个返回结果给他,任务执行完后可以拿到最终结果,并且用户退出web界面或浏览器异常关闭之后,再次返回界面,执行的过程不会中断,并且支持多用户同时执行不同 *** 作的需要。

        很明显,这是一个-异步多线程-的场景,在Python中可以想到的有:

        1.引入Asyncio模块,利用多协程实现。

        2.使用Threading模块,自己编写线程任务,线程等待,睡眠,释放线程的过程。

        3.使用异步框架,例如Cerely、Tornado、Twisted等等,装饰异步任务。

        这里边最便捷且开发效率最高的应该是使用异步框架,咱们选择使用Celery来实现这个需求。

Celery介绍:

        截图与描述来自celery官网:Celery - Distributed Task Queue — Celery 5.2.0 documentation

        Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为 *** 作提供维护此类系统所需的工具。

        它是一个专注于实时处理的任务队列,同时也支持任务调度。

        Celery 拥有庞大而多样化的用户和贡献者社区,您应该加入我们的 IRC 或我们的邮件列表。

        Celery 是开源的,并在BSD 许可下获得许可。

消费者与消费结果:

        我们除了需要Celery做异步任务的处理,还需要一个中间件来充当消费者,并保存最终的任务处理结果(消费结果),这里有很多中间件可以选,例如常用的消息中间件,rabbitmq,kafka等,还可以使用mysql,redis等作为消费者并保存消费结果(因为最终的处理结果要返回给前端同事),楼主最终选择了redis。

Redis安装与配置:

        这里不再赘述windows下安装redis步骤,只介绍linux下安装redis与配置,我的机器是centos7.6:

        yum方式安装(注意:这样安装的redis不是最新版本的,如有对版本要求比较高的,建议去官网下载源码包去手动安装,官网地址:Redis,最新版本:6.2.6)

yum -y install redis

        安装完成之后配置redis.conf文件:

vi /etc/redis.conf

        修改这一行,改成 0.0.0.0,这样别的应用和组件才可以访问到redis的服务与端口:

        同理,redis的默认端口也可以在此配置里修改:

        还有一些关闭匿名访问,设置密码等配置的修改,项目若要上到公网环境下,建议配置。

        启动并测试redis服务功能是否正常:

        启动redis:        

redis-cli -h 0.0.0.0

        测试redis:

1 redis> set name "zzz"
2 
3 OK
4 
5 redis> get name
6 
7 "zzz"

        记住,代码并没有实际引用redis,但也需要安装redis模块,否则会报错。(redis模块版本不要太高,高了也会报错,这些坑都是楼主亲自趟过的,我这里使用2.10.6)

pip install redis==2.10.6
 Celery的安装和配置:

        windos和linux下都可以使用pip安装:

 pip install celery==3.1.25

        我的项目目录:(celeryconfig.py与__init__.py文件为celery与redis配置文件):

          

        在项目中先创建一个名为config的python目录,并在__init__.py中导入celery模块并配置:

__init__.py:

from celery import Celery,platforms
platforms.C_FORCE_ROOT = True

app = Celery('prod')  # 创建 Celery 实例
app.config_from_object('kernel.config.celeryconfig')  # 通过 Celery 实例加载配置模块

        platforms.C_FORCE_ROOT = True 这个配置一定要有,否则会报权限问题。

        在config目录下的celeryconfig.py中配置任务队列消费者与消费结果保存在redis的地址:

celeryconfig.py:

## celery配置

BROKER_URL = 'redis://redis-host:6379/1'  # 指定 Broker消费者,我们使用redis 1号数据库
CELERY_RESULT_BACKEND = 'redis://redis-host:6379/2'  # 指定 Backend,最终消费结果,我们使用redis 2号数据库



CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定时区,默认是 UTC

CELERY_importS = (  # 指定导入的任务模块
    'kernel.views.api'   ## 异步任务代码文件路径即可
)

        至此,前期需要的工具准备工作全部完毕,我们开始我们的开发任务。

异步任务开发:

        楼主因为主要负责后端这块,这里选择使用flask来写,整体的项目模块与版本,大概罗列下:

                        Python 3.5.4
                        Mysql  5.5.64        
                        Celery==3.1.25
                        Flask==1.1.4
                        Redis==2.10.6

        这时我们与前端同事再次详细沟通了下,初步约定如下:

        1.前端通过form表单传数据给后端,格式为json,分析:需要解析json数据。

        2.因为存在长耗时的任务,要求一旦前端请求过来,后端要马上返回一个中间结果给前端(这样解决了前端页面等待的问题),分析:需要马上提供一个返回结果。

        3.前端最终要拿到任务的最终执行结果,分析:我们需要把长耗时异步任务的最终结果推送给前端,需要任务代码最后推送执行结果。(自己先定义回调接口去测试)

1.后端Flask接口代码: 文件名称与路径:

        项目名称-kernel-view-api.py,与celery配置下的任务模块对应。

 api.py:

# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.models.playbook import PlayBook_file
from kernel.utils import render_response, Retval
from kernel.models import db
from sqlalchemy import or_,text
import gitlab  ## 导入gitlab模块
from kernel.config import app, cmdb_config,hcacp_config
import pymysql,uuid,hashlib,time
from datetime import timezone



bp = Blueprint('test', __name__)  ## 蓝图自己定义,这里只是实例化
log = logging.getLogger(__name__)    ## 日志自己定义,这里只是实例化

class status:  ## 定义一些状态码
    success = 0
    warning = 1
    pending = 2
    faild = -1


## 回调接口
@bp.route('/test/callback/', methods=['GET', 'POST'])
def ansible_aaa():
    data1 = request.get_data(as_text=True)
    # data2 = json.loads(data1)
    log.info(data1)
    return data1

@bp.route('/test/add/', methods=['POST', 'GET'])
def devops_add():
    '''
        获取form表单json数据
    '''
    # return True
    try:
        data = request.get_data()
        _data = json.loads((str(data, 'utf-8')))
        print(_data)
    except Exception as requestdata_except:
        log.error('获取表单数据异常,异常原因:%s' % requestdata_except)
        return render_response(status.faild, u"获取表单数据异常,异常原因:%s" % requestdata_except, {})
        ## 获取标识tag的结果
    try:
        '''
        工单json数据要带工单标识符select_tag:
        create_project:新建项目申请工单
        '''
        select_tag = _data.get('select_tag')
    except Exception as request_select_tag_except:
        log.error('获取表单需求标识select_tag异常,异常原因:%s' % request_select_tag_except)
        return render_response(status.faild, u"获取表单需求标识select_tag异常,异常原因:%s" % request_select_tag_except, {})
    try:
        """ 
            !--当参数select_tag == create_project 时,建立项目--! 

        """
        if select_tag == 'create_project':
            projname = _data.get('projname')
            add_project_result = add_project.delay(cmdb_config, _data)
            return render_response(status.pending, u"devops系统添加项目工单任务执行中--pending--", {'项目中文名称': projname})
    except Exception as do_celery_job_except:
        log.error('执行异步celery任务异常,异常原因:%s' % do_celery_job_except)
        return render_response(status.faild, u"执行异步celery任务异常,异常原因:%s" % do_celery_job_except, {})

这里代表前端请求过来之后,马上返回一个执行结果,满足需求2:

在devops_add接口里执行异步任务:        

        add_project_result = add_project.delay(cmdb_config, _data)

官网的示例:

        ## 1.扩号里为异步任务所需的参数

        ## 2.add_project_result 是异步任务执行的对象,包含很多属性方法,下边介绍一些常用的:

        获取任务结果和状态:
        add_project_result = task.apply_async()
        add_project_result.ready()     # 查看任务状态,返回布尔值,  任务执行完成, 返回 True, 否则返回 False.
        add_project_result.wait()      # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
        add_project_result.get(timeout=1)       # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
        add_project_result.result      # 任务执行结果,未完成返回None;
        add_project_result.state       # PENDING, START, SUCCESS,任务当前的状态
        add_project_result.status      # PENDING, START, SUCCESS,任务当前的状态
        add_project_result.successful  # 任务成功返回true
        add_project_result.traceback  # 如果任务抛出了一个异常,可以获取原始的回溯信息

     

2.异步任务代码: 文件名称与路径:

        项目名称-kernel-view-api.py

api.py

解释:

        因为要满足需求3,把最终异步耗时任务的真正结果给到前端,所以我们需要在异步任务里写一个回调的 *** 作。

         header = {'Content-Type': 'application/json'}  ## 构造请求头和数据类型
        _json = {"status": sttaus.faild, "msg": u"失败", "data": {}}  ## 失败就返回给前端json类型失败

        _json = {"status": sttaus.success, "msg": u"成功", "data": {}}  ## 成功就返回给前端json类型成功

        requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 带参回调请求

# -*- coding: utf-8 -*-
import json, sys
import logging
import requests
import datetime,pymysql
import os,subprocess
from flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Response
from kernel.utils import render_response, Retval
from datetime import timezone
from kernel.config import *  ## 导入config目录下的celery配置


bp = Blueprint('test', __name__)  ## 蓝图自己定义,这里只是实例化
log = logging.getLogger(__name__)    ## 日志自己定义,这里只是实例化

class status:  ## 定义一些状态码
    success = 0
    warning = 1
    pending = 2
    faild = -1



## 示例函数:一个添加信息函数,前端给我们json数据,后端接受之后去插入数据库,完成 *** 作并告诉前端
@app.task  ## celery添加项目任务
def add_project(mysql_config, _data):
    try:
        ## 系统添加项目信息工单
        projname = _data.get('projname')  ## 项目名称,必填
        prodesc= _data.get('prodesc')  ## 项目描述,必填
        projctime = datetime.datetime.now()  ## 项目发布时间
        callback_url = _data.get('callback_url')  ## 回调接口地址
    except Exception as describe_form_except:
        log.error('解析表单数据出现异常,异常原因:%s' % describe_form_except)
        header = {'Content-Type': 'application/json'}  ## 回调接口请求头
        _json = {"status": status.faild, "msg": u"失败", "data": {}}
        requests.post(callback_url, headers=header, data=json.dumps(_json))
    try:
        # 获取数据库连接
        conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db)
        # 返回连接
        cursor = conn.cursor()
    except Exception as connect_except:
        log.error('系统数据库连接出现异常,异常原因:%s' % connect_except)
        _json = {"status": status.faild, "msg": u"失败", "data": {}}
        requests.post(callback_url, headers=header, data=json.dumps(_json))
    try:
        proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ('{}','{}','{}');".format(projname, prodesc, projctime)
        cursor.execute(proj_sql)
        conn.commit()
        _json = {"status": status.success, "msg": u"成功", "data": {}}
        requests.post(callback_url, headers=header, data=json.dumps(_json))
        ## 任务执行完成之后调用回调接口,返回任务执行成功结果
        log.info('系统建项目工单执行成功,%s' % proj_sql)
    except Exception as do_add_project_except:
        _json = {"status": status.faild, "msg": u"失败", "data": {}}
        requests.post(callback_url, headers=header, data=json.dumps(_json))
        log.error('执行添加项目工单异常,异常原因:%s' % do_add_project_except)
        ## 任务执行完成之后调用回调接口,返回任务执行失败结果

        楼主用的最简单,没有在task里写一些属性,类似下边的这种方式还可以给task添加一些属性:

        @app.task(name='test',bind=True,base=baseTask)

       补充介绍下异步task有的一些属性:

        TASK的一般属性:
        Task.name:任务名称;
        Task.request:当前任务的信息;
        Task.max_retries:设置重试的最大次数
        Task.throws:预期错误类的可选元组,不应被视为实际错误,而是结果失败;
        Task.rate_limit:设置此任务类型的速率限制
        Task.time_limit:此任务的硬限时(以秒为单位)。
        Task.ignore_result:不存储任务状态。默认False;
        Task.store_errors_even_if_ignored:如果True,即使任务配置为忽略结果,也会存储错误。
        Task.serializer:标识要使用的默认序列化方法的字符串。
        Task.compression:标识要使用的默认压缩方案的字符串。默认为task_compression设置。
        Task.backend:指定该任务的结果存储后端用于此任务。
        Task.acks_late:如果设置True为此任务的消息将在任务执行后确认 ,而不是在执行任务之前(默认行为),即默认任务执行之前就会发送确认;
        Task.track_started:如果True任务在工作人员执行任务时将其状态报告为“已启动”。默认是False;

我们启动celery来看下celery里在执行任务的过程中有什么变化:

(1)启动项目:

楼主用的是gunicorn工具启动,配置多线程:

gunicorn.conf

        workers = 16   ## 多线程配置

        bind = '0.0.0.0:7777'

        proc_name = 'websocket(项目名称)'

        limit_request_field_size = 0

        limit_request_line = 0

        log_level = 'error'

        debug = True

        chdir = '/data/websocket' ## 项目目录

        启动命令:gunicorn -c  /项目目录/gunicorn.conf kernel:app

(2)启动celery:

        cd 到项目目录下,执行 celery -A kernel.views.api worker -l info  

(3)使用postman调用接口:

        可以看到直接先返回我们状态码2-等待状态:

(4)从日志看异步任务执行过程:

        1.会先在celery里出现一个异步任务,并生成一个异步任务的task-id号:

        2.redis去查看是否已有task任务,task-id号是一致的:

        用add_project_result保存异步任务执行结果的对象,最终的结果是在redis中,我们也可以去redis里去拿,redis保存的结果。

        我们用的redis 2号数据库,select 2 号数据库,keys * 查看redis是否已有任务

        任务最终的执行结果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直观,succeded代表异步任务执行成功):

        3. 查看项目日志,状态码为1,是回调接口打印出来的,代表返回给回调接口最终结果是成功。

        4.最终去数据库看下新添加记录是否已有,这里就不截图了,记录插入成功,异步任务执行成功,也满足了开始我们沟通的三个需求。

        5.前端同学给你竖起了大拇指,直呼你牛!

          

 

备注:

                ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​

 

 

        celery还可以用来做定时任务,感兴趣的伙伴们可以去官网或者其他途径去研究下,楼主第一次写这么大的博客,有些地方我描述不清楚的或者您没太看懂的可以私信我答疑解惑,我的微信zcw576020095,热爱python,热爱运维,一起加油!

        

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5436666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存