您可以编写自定义加载程序,也可以使用信号。
加载程序具有该
on_task_init方法,该方法将在要执行任务时调用,该方法
on_worker_init由celery +
celerybeat主进程调用。
使用信号可能是最简单的,可用的信号是:
0.8.x:
task_prerun(task_id, task, args, kwargs)
在工作人员即将执行任务时(或在使用
apply/或
CELERY_ALWAYS_EAGER已设置的情况下,在本地)执行任务时调度。
task_postrun(task_id, task, args, kwargs, retval)
在与上述相同的条件下执行任务后调度。task_sent(task_id, task, args, kwargs, eta, taskset)
应用任务时调用(不适用于长时间运行的 *** 作)
0.9.x中可用的其他信号(github上的当前主分支):
worker_init()
在celeryd启动时调用(在初始化任务之前,因此,如果在支持的系统上
fork,则任何内存更改都将复制到子worker进程)。
worker_ready()
在celeryd能够接收任务时调用。
worker_shutdown()
在celeryd关闭时调用。
这是一个示例,该示例在流程中第一次运行任务时进行预先计算:
from celery.task import Taskfrom celery.registry import tasksfrom celery.signals import task_prerun_precalc_table = {}class PowersOfTwo(Task): def run(self, x): if x in _precalc_table: return _precalc_table[x] else: return x ** 2tasks.register(PowersOfTwo)def _precalc_numbers(**kwargs): if not _precalc_table: # it's empty, so haven't been generated yet for i in range(1024): _precalc_table[i] = i ** 2# need to use registered instance for sender argument.task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
如果要对所有任务运行该函数,只需跳过
sender参数。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)