更新:从那以后,我们就开始使用一种更好的方式来处理应用程序拆卸,并根据最新的flask文档中描述的模式在每个任务的基础上进行设置。
extensions.py
import flaskfrom flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celeryclass FlaskCelery(Celery): def __init__(self, *args, **kwargs): super(FlaskCelery, self).__init__(*args, **kwargs) self.patch_task() if 'app' in kwargs: self.init_app(kwargs['app']) def patch_task(self): Taskbase = self.Task _celery = self class ContextTask(Taskbase): abstract = True def __call__(self, *args, **kwargs): if flask.has_app_context(): return Taskbase.__call__(self, *args, **kwargs) else: with _celery.app.app_context(): return Taskbase.__call__(self, *args, **kwargs) self.Task = ContextTask def init_app(self, app): self.app = app self.config_from_object(app.config)celery = FlaskCelery()db = SQLAlchemy()
app.py
from flask import Flaskfrom extensions import celery, dbdef create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.init_app(app) return app
通过这种方式设置应用程序后,您可以运行和使用celery,而不必从应用程序上下文中显式运行它,因为如果需要,所有任务将自动在应用程序上下文中运行,而您不必明确担心任务后拆卸,这是一个重要的管理问题(请参见下面的其他回复)。
故障排除那些不断得到的人
with _celery.app.app_context(): AttributeError: 'FlaskCelery' objecthas no attribute 'app'要确保:
- 将
celery
导入保持在app.py
文件级别。避免:
app.py
from flask import Flaskdef create_app(): app = Flask() initiliaze_extensions(app) return appdef initiliaze_extensions(app): from extensions import celery, db # DOOMED! Keep celery import at the FILE level db.init_app(app) celery.init_app(app)
在您
flask run
使用之前开始使用芹菜工作者celery worker -A app:celery -l info -f celery.log
注意
app:celery,即从加载
app.py。
您仍然可以从扩展名导入来装饰任务,即
from extensions import celery。下面的旧答案仍然有效,但不是一个干净的解决方案
我更喜欢通过在应用程序上下文中创建一个单独的文件来调用celery.start()来在应用程序上下文中运行所有celery。这意味着您的任务文件不必因上下文设置和拆卸而乱七八糟。它也很适合烧瓶的“应用程序工厂”模式。
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celerydb = SQLAlchemy()celery = Celery()
task.py
from extensions import celery, dbfrom flask.globals import current_appfrom celery.signals import task_postrun@celery.taskdef do_some_stuff(): current_app.logger.info("I have the application context") #you can now use the db object from extensions@task_postrun.connectdef close_session(*args, **kwargs): # Flask SQLAlchemy will automatically create new sessions for you from # a scoped session factory, given that we are maintaining the same app # context, this ensures tasks have a fresh session (e.g. session errors # won't propagate across tasks) db.session.remove()
app.py
from extensions import celery, dbdef create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.config_from_object(app.config) return app
RunCelery.py
from app import create_appfrom extensions import celeryapp = create_app()if __name__ == '__main__': with app.app_context(): celery.start()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)