如何在Celery任务中使用Flask-SQLAlchemy

如何在Celery任务中使用Flask-SQLAlchemy,第1张

如何在Celery任务中使用Flask-SQLAlchemy

更新:从那以后,我们就开始使用一种更好的方式来处理应用程序拆卸,并根据最新的flask文档中描述的模式在每个任务的基础上进行设置。

extensions.py

import flaskfrom flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celeryclass FlaskCelery(Celery):    def __init__(self, *args, **kwargs):        super(FlaskCelery, self).__init__(*args, **kwargs)        self.patch_task()        if 'app' in kwargs: self.init_app(kwargs['app'])    def patch_task(self):        Taskbase = self.Task        _celery = self        class ContextTask(Taskbase): abstract = True def __call__(self, *args, **kwargs):     if flask.has_app_context():         return Taskbase.__call__(self, *args, **kwargs)     else:         with _celery.app.app_context():  return Taskbase.__call__(self, *args, **kwargs)        self.Task = ContextTask    def init_app(self, app):        self.app = app        self.config_from_object(app.config)celery = FlaskCelery()db = SQLAlchemy()

app.py

from flask import Flaskfrom extensions import celery, dbdef create_app():    app = Flask()    #configure/initialize all your extensions    db.init_app(app)    celery.init_app(app)    return app

通过这种方式设置应用程序后,您可以运行和使用celery,而不必从应用程序上下文中显式运行它,因为如果需要,所有任务将自动在应用程序上下文中运行,而您不必明确担心任务后拆卸,这是一个重要的管理问题(请参见下面的其他回复)。

故障排除

那些不断得到的人

with _celery.app.app_context(): AttributeError: 'FlaskCelery' objecthas no attribute 'app'
要确保:

  1. celery
    导入保持在
    app.py
    文件级别。避免:

app.py

from flask import Flaskdef create_app():    app = Flask()    initiliaze_extensions(app)    return appdef initiliaze_extensions(app):    from extensions import celery, db # DOOMED! Keep celery import at the FILE level    db.init_app(app)    celery.init_app(app)
  1. 在您

    flask run
    使用之前开始使用芹菜工作者

    celery worker -A app:celery -l info -f celery.log

注意

app:celery
,即从加载
app.py

您仍然可以从扩展名导入来装饰任务,即

from extensions import celery

下面的旧答案仍然有效,但不是一个干净的解决方案

我更喜欢通过在应用程序上下文中创建一个单独的文件来调用celery.start()来在应用程序上下文中运行所有celery。这意味着您的任务文件不必因上下文设置和拆卸而乱七八糟。它也很适合烧瓶的“应用程序工厂”模式。

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celerydb = SQLAlchemy()celery = Celery()

task.py

from extensions import celery, dbfrom flask.globals import current_appfrom celery.signals import task_postrun@celery.taskdef do_some_stuff():    current_app.logger.info("I have the application context")    #you can now use the db object from extensions@task_postrun.connectdef close_session(*args, **kwargs):    # Flask SQLAlchemy will automatically create new sessions for you from     # a scoped session factory, given that we are maintaining the same app    # context, this ensures tasks have a fresh session (e.g. session errors     # won't propagate across tasks)    db.session.remove()

app.py

from extensions import celery, dbdef create_app():    app = Flask()    #configure/initialize all your extensions    db.init_app(app)    celery.config_from_object(app.config)    return app

RunCelery.py

from app import create_appfrom extensions import celeryapp = create_app()if __name__ == '__main__':    with app.app_context():        celery.start()


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5647843.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存