在运行任务之前,如何设置Celery以调用自定义初始化函数?

在运行任务之前,如何设置Celery以调用自定义初始化函数?,第1张

在运行任务之前,如何设置Celery以调用自定义初始化函数?

您可以编写自定义加载程序,也可以使用信号

加载程序具有该

on_task_init
方法,该方法将在要执行任务时调用,该方法
on_worker_init
由celery +
celerybeat主进程调用。

使用信号可能是最简单的,可用的信号是:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

在工作人员即将执行任务时(或在使用

apply
/或
CELERY_ALWAYS_EAGER
已设置的情况下,在本地)执行任务时调度。

  • task_postrun(task_id, task, args, kwargs, retval)
    在与上述相同的条件下执行任务后调度。

  • task_sent(task_id, task, args, kwargs, eta, taskset)

应用任务时调用(不适用于长时间运行的 *** 作)

0.9.x中可用的其他信号(github上的当前主分支):

  • worker_init()

在celeryd启动时调用(在初始化任务之前,因此,如果在支持的系统上

fork
,则任何内存更改都将复制到子worker进程)。

  • worker_ready()

在celeryd能够接收任务时调用。

  • worker_shutdown()

在celeryd关闭时调用。

这是一个示例,该示例在流程中第一次运行任务时进行预先计算:

from celery.task import Taskfrom celery.registry import tasksfrom celery.signals import task_prerun_precalc_table = {}class PowersOfTwo(Task):    def run(self, x):        if x in _precalc_table: return _precalc_table[x]        else: return x ** 2tasks.register(PowersOfTwo)def _precalc_numbers(**kwargs):    if not _precalc_table: # it's empty, so haven't been generated yet        for i in range(1024): _precalc_table[i] = i ** 2# need to use registered instance for sender argument.task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

如果要对所有任务运行该函数,只需跳过

sender
参数。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5661605.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存