Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
- 并发:Prefork, Eventlet, gevent, threads/single threaded
- 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时 *** 作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
1.3 celery安装你可以安装Celery通过Python包管理平台(PyPI)或者源码安装
使用pip安装:
pip install -U Celery pip install eventlet二、Celery 执行异步任务 2.1 基本使用
创建项目celerypro
创建异步任务执行文件celery_task.py:
import celery import time backend='redis://127.0.0.1:6379/1' broker='redis://127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
创建执行任务文件,produce_task.py:
from celery_task import send_email result = send_email.delay("yuan") print(result.id) result2 = send_email.delay("alex") print(result2.id)
异步启动命令:
celery -A celery_task worker -l info -P eventlet -c 10
在window环境下运行redis需要eventlet协程合作,-c表示协程(并发)的进程数为10
报错: consumer: Cannot connect to redis://127.0.0.1:6379/2: Error 10061 connecting to 127.0.0.1:6379. 由于目标计算机积极拒绝,无法连接
解决方案:redis.exceptions.ConnectionError: Error 10061 connecting to 127.0.0.1:6379. 由于目标计算机积极拒绝,无法连接_小呆丶的博客-CSDN博客_redis由于目标计算机积极拒绝,无法连接。
2.2 多任务结构
#celery.py from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=['celery_tasks.task01', 'celery_tasks.task02' ]) # 时区 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
# task01.py import time from celery_tasks.celery import cel @cel.task def send_email(res): print("向%s发送邮件任务"%res) time.sleep(5) return "发送邮件完成" # task02.py import time from celery_tasks.celery import cel @cel.task def send_msg(name): print("向%s发送短信任务"%name) time.sleep(5) return "发送短信完成"
在终端启动redis服务:celery -A celery_tasks worker -l info -P eventlet
# produce_task.py from celery_tasks import task01, task02 result = task01.send_email.delay("钟晓挺") print(result.id) result2 = task02.send_msg.delay("苏威") print(result2.id)
运行produce_task.py文件,发送消息
# check_task.py from celery.result import AsyncResult from celery_tasks.celery import cel async_result = cel.AsyncResult(id='5c16df85-c426-446d-8b17-ae66633dd7e3', app=cel) if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print("执行失败") elif async_result.status == 'PENDING': print("任务等待被执行") elif async_result.status == 'RETRY': print("任务异常后重试") elif async_result.status == 'STARTER': print("任务已经开始被执行")三、 Celery执行定时任务
设定时间让celery执行一个定时任务,produce_task.py:
# produce_task.py from datetime import datetime from celery_task import send_email,send_msg # 方式一 v1 = datetime(2020, 11, 7, 21, 35, 00) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) result = send_email.apply_async(args=['egon'], eta = v2) print(result.id) # 方式二 ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=1=10) task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = send_email.apply_async(args=["egon"], eta=task_time) print(result.id)
多任务结构
# celery.py from datetime import timedelta from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=['celery_tasks.task01', 'celery_tasks.task02' ]) # 时区 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False cel.conf.beat_schedule = { 'add-every-10-seconds': { # 执行tasks下的test_celery函数 'task': 'celery_tasks.task01.send_email', # 每隔10秒执行一次 'schedule': timedelta(seconds=6), # 每隔6秒发送一次 # 传递参数 'args': ('张三',) } }
在终端中启动:celery -A celery_tasks worker -l info -P eventlet -c 10
另开辟一个终端:celery -A celery_tasks beat
四、 在Django中使用celery创建django项目DjangoCelery
启动django:python manage.py runserver
在DjangoCelery目录下url.py 中配置路由
创建app01,celery_test目录
在celery_test目录下新建sms目录
配置config.py文件
broker_url = 'redis://127.0.0.1:6379/3' result_backend = 'redis://127.0.0.1:6379/4'
main.py文件
import os from celery import Celery app = Celery("sms") # 加载django配置 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev') # 通过app对象加载配置 app.config_from_object("celery_test.config") # 加载任务路径 app.autodiscover_tasks(['celery_test.sms'])
tasts.py文件(必须是这个名字:启动celery库包里的任务)
from celery_test.main import app import time import logging log = logging.getLogger("django") @app.task def send_sms(mobile): # 发送短信 print("向手机号%s发送短信" % mobile) time.sleep(5) return "send_sms OK" @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名 def send_sms2(mobile): print("向手机号%s发送短信成功!" % mobile) time.sleep(5) return "send_sms2 OK"
启动终端:celery -A celery_test.main worker -l info -P eventlet
app01中的views.py
from django.http import HttpResponse from celery_test.sms.tasks import send_sms, send_sms2 from datetime import datetime, timedelta def test(request): # 异步任务 # send_sms.delay("zhong") # send_sms2.delay("xiao") # 定时任务 ctime = datetime.now() utc_time = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=6) task_time = utc_time + time_delay result = send_sms.apply_async(["zhong", ], eta=task_time) result1 = send_sms2.apply_async(["xiao", ], eta=task_time) print(result.id) return HttpResponse("ok")
启动django项目:python manage.py runserver
访问路由时可以看到异步发送短信,或者是定时发送短信
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)