消息队列与任务队列: 是一种能实现生产者到消费者通信的通信模型, 用于传递消息的, 如Kafka, RabbitMq. 任务队列在消息队列之上实现各种任务, 比如发邮件.
Celery: 消息中间件(Broker), 任务执行单元(Worker), 结果存储(Backend)
工作流程: Task client 提交任务到 Broker, Worker 监控 Broker 取出里面的任务并执行, 执行结果保存在Backend中.
celery应用开发: celery实例初始化, 任务定义, 启动worker, 调用任务.
使用版本: Python 3.8.2, celery==5.2.3
2. celery应用实例项目结构
. ├── README.md ├── celery_app # 应用 │ ├── __init__.py # 实例初始化 │ ├── config.py # 相关配置 │ └── tasks.py # 任务 ├── log │ ├── celery_app.log ├── script │ ├── __init__.py │ ├── celery_run.py # celery任务调用测试
实例初始化
一般初始化, 创建实例时直接指定 broker 和 backend, 并加载配置项. broker使用rabbit, backend使用redis, 加载配置项时指定序列化方式等其他需要的配置.
# __init__.py # 一般初始化方式 from celery import Celery app = Celery('celery_app', broker='amqp://sanford:123456@localhost:5672/sanford_host', backend='redis://localhost:6379/0') app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone = 'Asia/Shanghai', enable_utc=True, )
使用配置文件, 将相关配置写在配置文件里, 创建实例时加载配置文件. celery使用的版本不同, 配置文件里的配置项名称可能不同.
# __init__.py from celery import Celery app = Celery('celery_app') # 加载配置文件 app.config_from_object('celery_app.config')
# config.py from celery.schedules import crontab from kombu import Exchange, Queue # 指定broker, 这里使用rabbitmq: 'amqp://username:password@host:port/virtual_host' broker_url = 'amqp://sanford:123456@localhost:5672/sanford_host' # 指定backend, 这里使用redis: 'redis://host:port/db' result_backend = 'redis://localhost:6379/0' # 指定任务序列化方式 json, msgpack task_serializer = 'json' # 指定结果序列化方式 json, msgpack result_serializer = 'json' # 任务过期时间 秒 result_expires = 60 * 60 * 24 # 指定任务接受的序列化类型. accept_content = ['json', 'msgpack'] # 时区 timezone = 'Asia/Shanghai' # 导入任务 imports = ( 'celery_app.tasks' ) # 定时任务 celery_app.tasks.timing beat_schedule = { 'test_timing': { 'task': 'celery_app.tasks.timing', 'schedule': crontab(minute='*/1'), 'args': ('jay', 'chou') # 对应任务的入参 } } # 设置队列, 这里设置了两个队列 timing_queue用于处理定时任务, default用于处理其他 task_queues = ( Queue('default', exchange=Exchange(name='default', type='direct'), routing_key='default'), Queue('timing_queue', exchange=Exchange(name='timing_exchange', type='direct'), routing_key='routing.timing'), ) # 设置任务对应的路由 celery_app.tasks.timing task_routes = { 'celery_app.tasks.timing': { 'queue': 'timing_queue', 'routing_key': 'routing.timing', }, '*': { 'queue': 'default', 'routing_key': 'default' }, } # 指定默认的队列, 如果一个消息设置的路由不符合, 则会进入这个队列 task_default_queue = 'default'
定义任务:
这里定义了三个任务用于测试, 包含了异步调用和定时调用测试. 通过celery的装饰器定义任务
# tasks.py from celery_app import app @app.task def add(x, y): return x + y @app.task def cheng(x, y): return x * y @app.task def timing(*args): return args
启动worker:
# 发布定时任务 celery -A celery_app beat # 启动worker # 指定4个进程数: -c 4 # windows系统加上: -P eventlet # 指定要处理的队列消息: -Q queue_name celery -A celery_app worker --loglevel=info --logfile="log/celery_app.log" # 发布定时任务并且启动worker celery -A celery_app worker -B --loglevel=info --logfile="log/celery_app.log"
-------------- celery@bogon v5.2.3 (dawn-chorus) --- ***** ----- -- ******* ---- macOS-12.1-x86_64-i386-64bit 2022-01-27 11:29:31 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: celery_app:0x1035d63a0 - ** ---------- .> transport: amqp://sanford:**@localhost:5672/sanford_host - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> default exchange=default(direct) key=default .> timing_queue exchange=timing_exchange(direct) key=routing.timing [tasks] . celery_app.tasks.add . celery_app.tasks.cheng . celery_app.tasks.timing
任务执行结果
由上面发布定时任务 celery_app.tasks.timing 并启动worker, 期间也手动调用了任务 celery_app.tasks.add, 打印出的日志如下:
[2022-01-27 11:29:31,692: INFO/MainProcess] Connected to amqp://sanford:**@127.0.0.1:5672/sanford_host [2022-01-27 11:29:31,700: INFO/Beat] beat: Starting... [2022-01-27 11:29:31,701: INFO/MainProcess] mingle: searching for neighbors [2022-01-27 11:29:31,753: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing) [2022-01-27 11:29:32,741: INFO/MainProcess] mingle: all alone [2022-01-27 11:29:32,769: INFO/MainProcess] celery@bogon ready. [2022-01-27 11:29:32,800: INFO/MainProcess] Task celery_app.tasks.timing[350c3525-99a0-4bcc-91c1-8af203063a9d] received [2022-01-27 11:29:32,913: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[350c3525-99a0-4bcc-91c1-8af203063a9d] succeeded in 0.009525484000000084s: ('jay', 'chou') [2022-01-27 11:30:00,000: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing) [2022-01-27 11:30:00,002: INFO/MainProcess] Task celery_app.tasks.timing[82a9c870-2e6d-408b-8428-e1fde1b618f2] received [2022-01-27 11:30:00,006: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[82a9c870-2e6d-408b-8428-e1fde1b618f2] succeeded in 0.002058940000001286s: ('jay', 'chou') [2022-01-27 11:30:23,361: INFO/MainProcess] Task celery_app.tasks.add[b750444f-a66d-4fd3-8179-68712bc21f81] received [2022-01-27 11:30:23,363: INFO/ForkPoolWorker-8] Task celery_app.tasks.add[b750444f-a66d-4fd3-8179-68712bc21f81] succeeded in 0.0012687739999961423s: 5 [2022-01-27 11:31:00,001: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing) [2022-01-27 11:31:00,007: INFO/MainProcess] Task celery_app.tasks.timing[fb6a1b12-ad84-4590-9bbb-4412694f5a86] received [2022-01-27 11:31:00,012: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[fb6a1b12-ad84-4590-9bbb-4412694f5a86] succeeded in 0.003827135999998177s: ('jay', 'chou')
捞出redis里的两条执行结果如下:
127.0.0.1:6379> get "celery-task-meta-fb6a1b12-ad84-4590-9bbb-4412694f5a86" "{"status": "SUCCESS", "result": ["jay", "chou"], "traceback": null, "children": [], "date_done": "2022-01-27T03:31:00.009184", "task_id": "fb6a1b12-ad84-4590-9bbb-4412694f5a86"}" 127.0.0.1:6379> get celery-task-meta-b750444f-a66d-4fd3-8179-68712bc21f81 "{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2022-01-27T03:30:23.362510", "task_id": "b750444f-a66d-4fd3-8179-68712bc21f81"}"
非定时任务的调用测试
from celery_app.tasks import add, cheng def run(): ret = add.delay(2, 3) result, status, task_id = None, None, None # 获取任务结果 # result = ret.get() # status = ret.status # task_id = ret.task_id return result, status, task_id if __name__ == '__main__': _ret = run() print(_ret)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)