例如,任务在队列x中运行,异常将其排入另一个名为error_x的队列
编辑:
目前我使用celery == 3.0.13以及django 1.4,Rabbitmq作为经纪人.
有时候任务失败了.在芹菜中有没有办法将消息添加到错误队列并稍后处理它.
celery任务失败时的问题是我无法访问消息队列名称.所以我不能使用self.retry重试将它放到不同的错误队列中.
解决方法 好吧,如果要将任务路由到另一个队列,则无法使用重试机制.来自文档:retry() can be used to re-execute the task,for example in the event
of recoverable errors.When you call retry it will send a new message,using the same
task-ID,and it will take care to make sure the message is delivered
to the same queue as the originating task.
如果出现任何异常,您必须重新启动并手动将其路由到您想要的队列.这似乎是一个很好的工作error callbacks.
主要问题是我们需要在错误回调中获取任务名称才能启动它.此外,我们可能不希望每次启动任务时都添加回调.因此,装饰器将是自动添加正确回调的好方法.
from functools import partial,wrapsimport celery@celery.shared_taskdef error_callback(task_ID,task_name,retry_queue,retry_routing_key): # We must retrIEve the task object itself. # `tasks` is a dict of 'task_name': celery_task_object task = celery.current_app.tasks[task_name] # Re launch the task in specifIEd queue. task.apply_async(queue=retry_queue,routing_key=retry_routing_key)def retrying_task(retry_queue,retry_routing_key): """Decorates function to automatically add error callbacks.""" def retrying_decorator(func): @celery.shared_task @wraps(func) # just to keep the original task name def wrapper(*args,**kwargs): return func(*args,**kwargs) # Monkey patch the apply_async method to add the callback. wrapper.apply_async = partial( wrapper.apply_async,link_error=error_callback.s(wrapper.name,retry_routing_key) ) return wrapper return retrying_decorator# Usage:@retrying_task(retry_queue='another_queue',retry_routing_key='another_routing_key')def failing_task(): print 'Hi,I will fail!' raise Exception("I'm failing!")failing_task.apply_async()
您可以调整装饰器以传递您需要的任何参数.
总结以上是内存溢出为你收集整理的python – 在芹菜中有错误队列全部内容,希望文章能够帮你解决python – 在芹菜中有错误队列所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)