python– 使用Celery在部分任务中具有位置参数的链组

python– 使用Celery在部分任务中具有位置参数的链组,第1张

概述我正在编写一个应用程序,它将异步执行一组多个同步任务链.换句话说,我可能有管道foo(a,b,c) - >一些bs列表的boo(a,b,c).我的理解是创建一个foo链(a,b,c)| boo(a,b,c)表示此列表中的每个b.然后这些链形成一个芹菜组,可以异步应用.我的代码如下: my_app.py #!/usr/bin/env python3 im

我正在编写一个应用程序,它将异步执行一组多个同步任务链.

换句话说,我可能有管道foo(a,b,c) – >一些bs列表的boo(a,c).

我的理解是创建一个foo链(a,c)| boo(a,c)表示此列表中的每个b.然后这些链形成一个芹菜组,可以异步应用.

我的代码如下:

my_app.py
#!/usr/bin/env python3import functoolsimport timefrom celery import chain,group,Celeryfrom celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)app = Celery("my_app",broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')@app.taskdef foo(a,c):    logger.info("foo from {0}!".format(b))    return b@app.taskdef boo(a,c):    logger.info("boo from {0}!".format(b))    return bdef break_up_tasks(tasks):    try:        first_task,*remaining_tasks = tasks    except ValueError as e:        first_task,remaining_tasks = [],[]    return first_task,remaining_tasksdef do_tasks(a,bs,c,opts):    tasks = [foo,boo]    # There should be an option for each task    if len(opts) != len(tasks):        raise ValueError("There should be {0} provIDed options".format(len(tasks)))    # Create a List of tasks that should be included per the List of options' boolean values    tasks = [task for opt,task in zip(opts,tasks) if opt]    first_task,remaining_tasks = break_up_tasks(tasks)    # If there are no tasks,we're done.    if not first_task: return    chains = (        functools.reduce(            # `a` should be provIDed by `apply_async`'s `args` kwarg            # `b` should be provIDed by prevIoUs partials in chain            lambda x,y: x | y.s(c),remaining_tasks,first_task.s(a,c)        ) for b in bs    )    g = group(*chains)    res = g.apply_async(args=(a,),queue="default")    print("ApplIEd async... waiting for termination.")    total_tasks = len(tasks)    while not res.ready():        print("Waiting... {0}/{1} tasks complete".format(res.completed_count(),total_tasks))        time.sleep(1)if __name__ == "__main__":    a = "whatever"    bs = ["hello","world"]    c = "baz"    opts = [        # do "foo"        True,# do "boo"        True    ]    do_tasks(a,opts)
Running celery
celery worker -A my_app -l info -c 5 -Q default

但是,我发现,当我运行上述 *** 作时,我的服务器客户端运行无限循环,因为boo缺少一个参数

TypeError: boo() missing 1 required positional argument: 'c'

我的理解是apply_async将为每个链提供args kwarg,并且链中的先前链接将为后续链接提供其返回值.

为什么嘘声没有正确接受论点?我确信这些任务写得不好,因为这是我第一次涉足Celery.如果您有其他建议,我很乐意接受他们.

最佳答案在调试你的代码之后(我也是Celery的新手!:))我已经知道每个链接的函数都会将第一个参数替换为前一个链接函数调用的结果 – 所以用这个说我相信你的解决方案问题是在reduce中的ys中添加一个缺少的参数(第二个):

chains = (    functools.reduce(        # `a` should be provIDed by `apply_async`'s `args` kwarg        # `b` should be provIDed by prevIoUs partials in chain        lambda x,y: x | y.s(b,c),# <- here is the 'new guy'        remaining_tasks,c)    ) for b in bs)

希望能帮助到你. 总结

以上是内存溢出为你收集整理的python – 使用Celery在部分任务中具有位置参数的链组全部内容,希望文章能够帮你解决python – 使用Celery在部分任务中具有位置参数的链组所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1206216.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存