目录
初识celery
说明
celery的框架由三部分组成:
使用场景
使用优点
安装及使用
安装
案例演示
1. 创建项目celerypro
2. 创建异步任务执行文件celery_task
3. 使用命令启动celery
4. 执行测试函数
5. 异步获取结果
初识celery
celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于处理异步任务队列,同时也支持任务调度。
- 分布式系统:一个系统应用由不同的组件构成,我们将不同组件架构在不同的服务器中,不同组件之间通过消息通信的方式来实现协调工作。
- 图中user指的是django或者flask这样的框架,接收请求之后需要进行处理。
- 图中AMQPbroker指的是中间件,我们可以用RabbitMQ或者Redis来承担相关工作。
- 图中celery workers值得是celery,它作为消费者监听中间件中的任务,这里就需要用到并发的 *** 作了,celery实现了并发(进程+协程),我们只需要调用就可以了。
- 消息中间件(message broker)
- Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。
包括,RabbitMQ, Redis等等。
- 官方推荐用rabbitMQ,因为它持久稳定。
- Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。
- 任务执行单元(worker)
- Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
- Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
- 任务执行结果存储(task result store)
- Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
- 并发:Prefork, Eventlet, gevent, threads/single threaded
- 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
celery是一个强大分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。
我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时 *** 作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
使用优点Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。
Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。
并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
pip install celery
使用过程很简单
1. 确定异步任务函数,通过命令确定之后,生产者会把函数名和相关参数传给消息中间件。
2. 通过一条命令把celery启动,实现celery workers对消息队列的监听。
这里为了方便,我们使用redis来进行演示,redis在使用的时候充当两个角色,一个是消息中间件,一个是存储结果的数据库。
import celery
import time
backend = 'redis://127.0.0.1:6379/1' # 设置redis的1数据库来存放结果
broker = 'redis://127.0.0.1:6379/2' # 设置redis的2数据库存放消息中间件
cel = celery.Celery('test', backend=backend, broker=broker)
# 参数说明:第一个是celery的名字,这个celery和哪个项目相关就命名哪个
# 后面两个关键字参数则是指定消息中间件和结果存放位置。
@cel.task
def send_email(name):
print("向%s发送邮件..." % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "ok"
@cel.task
def send_msg(name):
print("向%s发送短信..." % name)
time.sleep(5)
print("向%s发送短信完成" % name)
return "ok"
3. 使用命令启动celery
celery --app=demo worker -l INFO
下面是执行之后的效果:
在使用过程中也可能会出现下面问题:
这是因为在celery用到了协程,协程在使用的使用需要用猴子补丁,具体解决方式是,首先下载eventlet:
pip install eventlet
然后对启动命令进行修改:
celery --app=celery_task worker -l INFO -P eventlet
4. 执行测试函数
建立一个文件,放入下面代码进行celery异步函数的测试:
from celery_task import send_email, send_msg
result1 = send_email.delay("张三")
print(result1.id)
result2 = send_email.delay("李四")
print(result2.id)
result3 = send_email.delay("王五")
print(result3.id)
result4 = send_email.delay("赵六")
print(result4.id)
下面是运行结果,而在celery的界面中也可以看到对应的日志信息:
运行的结果不是异步函数的返回值,而是一个id值,因为celery会将函数进行异步处理,结果会存放到指定的数据库中,而我们取值的时候就需要用id值了。
from celery.result import AsyncResult
from celery_task import cel
async_result=AsyncResult(id="275f43a8-a5bb-4822-9a90-8be3feeb3b4", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
说明:执行失败效果是代码有错但是异步不会停止,还是会执行获得id,但是当获取结果的时候,async_result.failed()结果为真。
如果要演示记得重启celery,否则修改不生效。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)