当运行类似 如下命令启动一个celery进程时,其实启动的是一个管理进程,此进程不处理实际的任务,而是产生的子进程或线程 去处理具体任务;那么这些 子进程或线程 在一起就叫做 执行池;
celery worker --app=worker.app
执行池的大小(子进程或线程的个数) 决定了 celery可以并发执行任务的个数;如果想尽可能快和多的执行任务,那么 增加执行池的大小 是个可以考虑的解决方案;
选择不同的执行池工作方式 Preforkcelery默认的执行池工作方式,是多进程的执行池,一般在 计算密集型任务中使用,能充分利用cpu多核; 如果不指定 –concurrency(并发进程个数)参数,则无论什么执行池都是尽可能多的使用cpu的核数;
Solo这种执行池有点特殊,因为此模式在处理任务时 直接在 管理进程中进行; 不是基于进程或线程的工作模式; 也就是一直只有一个 消费者 在处理任务;上个任务不结束,则下个任务就会阻塞; 但是在微服务中 比如 在使用k8s 进行 docker部署时,可以使用此模式,这样 k8s直接通过观察启动多少个docker容器 就能知道启动了多少个celery消费者;
此模式即使指定--concurrency(并发数) 参数值 也没有任何意义;
Eventlet/Gevent2个都是基于协程 的执行池,一般在 IO密集型任务中使用,如频繁的网络请求,数据库 *** 作等等;
gevent是对eventlet的高级封装,一般使用时 用 gevent,因为此包有monkey.patch_all()方法将 所有能转为协程的地方都转为协程,从而增加处理能力;
此模式指定--concurrency(并发数) 参数值可以比CPU核数多,理论上 只要内存不爆,套接字够用,就可以增加;
实际使用需求: 多任务 从mysql查询将 数据导入到mongo中;且任务间没有依赖关系
实现:使用celery+gevent实现任务并发;
用法:此任务为IO密集型任务(查询mysql,并发导入mongo[业务端只需将数据发送给mongo,其内部会实现并发处理]),且每个任务内存消耗均有一些;因此使用gevent,且设置并发数不能过多,否则可能出现内存不足情况;
部分代码如下:
""" 没有 celery定时任务,所以不需要 心跳服务 此次使用 基于协程的 gevent作为 执行池(execution pool);因为 瓶颈在于 同步到mongo的IO上; 本地测试时 可将 --pool=gevent 改为 --pool=solo 使worker顺序消费任务方便debug; """ from gevent import monkey # 使用gevent注意调用此方法 monkey.patch_all() import os from app import create_app from app.core import celery app = create_app() if __name__ == '__main__': os.environ["START_METHOD"] = "worker" # 本地开发调试时,注意将 --pool 值改为 solo celery.start( ["celery", "-A", "worker.celery", "worker", "--pool=gevent", "--loglevel=INFO", "-E", "-n", 'node_name_xxx', "-Q", "queue_name_xxx"])相关链接
Celery Execution Pools
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)