celery实现异步任务与定时任务

celery实现异步任务与定时任务,第1张

celery实现异步任务与定时任务 1. 概览

消息队列与任务队列: 是一种能实现生产者到消费者通信的通信模型, 用于传递消息的, 如Kafka, RabbitMq. 任务队列在消息队列之上实现各种任务, 比如发邮件.

Celery: 消息中间件(Broker), 任务执行单元(Worker), 结果存储(Backend)

工作流程: Task client 提交任务到 Broker, Worker 监控 Broker 取出里面的任务并执行, 执行结果保存在Backend中.

celery应用开发: celery实例初始化, 任务定义, 启动worker, 调用任务. 

使用版本: Python 3.8.2, celery==5.2.3

2. celery应用实例

项目结构

.
├── README.md
├── celery_app  # 应用
│   ├── __init__.py  # 实例初始化
│   ├── config.py  # 相关配置
│   └── tasks.py  # 任务
├── log
│   ├── celery_app.log
├── script
│   ├── __init__.py
│   ├── celery_run.py  # celery任务调用测试

实例初始化

一般初始化, 创建实例时直接指定 broker 和 backend, 并加载配置项. broker使用rabbit, backend使用redis, 加载配置项时指定序列化方式等其他需要的配置. 

# __init__.py

# 一般初始化方式
from celery import Celery

app = Celery('celery_app', broker='amqp://sanford:123456@localhost:5672/sanford_host', backend='redis://localhost:6379/0')
app.conf.update(
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        timezone = 'Asia/Shanghai',
        enable_utc=True,
    )

使用配置文件, 将相关配置写在配置文件里, 创建实例时加载配置文件. celery使用的版本不同, 配置文件里的配置项名称可能不同. 

# __init__.py

from celery import Celery

app = Celery('celery_app')

# 加载配置文件
app.config_from_object('celery_app.config')
# config.py

from celery.schedules import crontab
from kombu import Exchange, Queue

# 指定broker, 这里使用rabbitmq: 'amqp://username:password@host:port/virtual_host'
broker_url = 'amqp://sanford:123456@localhost:5672/sanford_host'

# 指定backend, 这里使用redis: 'redis://host:port/db'
result_backend = 'redis://localhost:6379/0'

# 指定任务序列化方式 json, msgpack
task_serializer = 'json'

# 指定结果序列化方式 json, msgpack
result_serializer = 'json'

# 任务过期时间 秒
result_expires = 60 * 60 * 24

# 指定任务接受的序列化类型.
accept_content = ['json', 'msgpack']

# 时区
timezone = 'Asia/Shanghai'

# 导入任务
imports = (
    'celery_app.tasks'
)

# 定时任务 celery_app.tasks.timing
beat_schedule = {
    'test_timing': {
        'task': 'celery_app.tasks.timing',
        'schedule': crontab(minute='*/1'),
        'args': ('jay', 'chou')  # 对应任务的入参
    }
}

# 设置队列, 这里设置了两个队列 timing_queue用于处理定时任务, default用于处理其他
task_queues = (
    Queue('default', exchange=Exchange(name='default', type='direct'), routing_key='default'),
    Queue('timing_queue', exchange=Exchange(name='timing_exchange', type='direct'), routing_key='routing.timing'),
)

# 设置任务对应的路由 celery_app.tasks.timing
task_routes = {
    'celery_app.tasks.timing': {
        'queue': 'timing_queue',
        'routing_key': 'routing.timing',
    },
    '*': {
        'queue': 'default',
        'routing_key': 'default'
    },
}

# 指定默认的队列, 如果一个消息设置的路由不符合, 则会进入这个队列
task_default_queue = 'default'

定义任务: 

这里定义了三个任务用于测试, 包含了异步调用和定时调用测试. 通过celery的装饰器定义任务

# tasks.py

from celery_app import app


@app.task
def add(x, y):
    return x + y


@app.task
def cheng(x, y):
    return x * y


@app.task
def timing(*args):
    return args

启动worker: 

# 发布定时任务
celery -A celery_app beat

# 启动worker
# 指定4个进程数: -c 4
# windows系统加上: -P eventlet
# 指定要处理的队列消息: -Q queue_name
celery -A celery_app worker --loglevel=info --logfile="log/celery_app.log"

# 发布定时任务并且启动worker
celery -A celery_app worker -B --loglevel=info --logfile="log/celery_app.log"
 -------------- celery@bogon v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-12.1-x86_64-i386-64bit 2022-01-27 11:29:31
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x1035d63a0
- ** ---------- .> transport:   amqp://sanford:**@localhost:5672/sanford_host
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default
                .> timing_queue     exchange=timing_exchange(direct) key=routing.timing

[tasks]
  . celery_app.tasks.add
  . celery_app.tasks.cheng
  . celery_app.tasks.timing

任务执行结果

由上面发布定时任务 celery_app.tasks.timing 并启动worker, 期间也手动调用了任务 celery_app.tasks.add, 打印出的日志如下: 

[2022-01-27 11:29:31,692: INFO/MainProcess] Connected to amqp://sanford:**@127.0.0.1:5672/sanford_host
[2022-01-27 11:29:31,700: INFO/Beat] beat: Starting...
[2022-01-27 11:29:31,701: INFO/MainProcess] mingle: searching for neighbors
[2022-01-27 11:29:31,753: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing)
[2022-01-27 11:29:32,741: INFO/MainProcess] mingle: all alone
[2022-01-27 11:29:32,769: INFO/MainProcess] celery@bogon ready.
[2022-01-27 11:29:32,800: INFO/MainProcess] Task celery_app.tasks.timing[350c3525-99a0-4bcc-91c1-8af203063a9d] received
[2022-01-27 11:29:32,913: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[350c3525-99a0-4bcc-91c1-8af203063a9d] succeeded in 0.009525484000000084s: ('jay', 'chou')
[2022-01-27 11:30:00,000: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing)
[2022-01-27 11:30:00,002: INFO/MainProcess] Task celery_app.tasks.timing[82a9c870-2e6d-408b-8428-e1fde1b618f2] received
[2022-01-27 11:30:00,006: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[82a9c870-2e6d-408b-8428-e1fde1b618f2] succeeded in 0.002058940000001286s: ('jay', 'chou')
[2022-01-27 11:30:23,361: INFO/MainProcess] Task celery_app.tasks.add[b750444f-a66d-4fd3-8179-68712bc21f81] received
[2022-01-27 11:30:23,363: INFO/ForkPoolWorker-8] Task celery_app.tasks.add[b750444f-a66d-4fd3-8179-68712bc21f81] succeeded in 0.0012687739999961423s: 5
[2022-01-27 11:31:00,001: INFO/Beat] Scheduler: Sending due task test_timing (celery_app.tasks.timing)
[2022-01-27 11:31:00,007: INFO/MainProcess] Task celery_app.tasks.timing[fb6a1b12-ad84-4590-9bbb-4412694f5a86] received
[2022-01-27 11:31:00,012: INFO/ForkPoolWorker-8] Task celery_app.tasks.timing[fb6a1b12-ad84-4590-9bbb-4412694f5a86] succeeded in 0.003827135999998177s: ('jay', 'chou')

捞出redis里的两条执行结果如下:

127.0.0.1:6379> get "celery-task-meta-fb6a1b12-ad84-4590-9bbb-4412694f5a86"
"{"status": "SUCCESS", "result": ["jay", "chou"], "traceback": null, "children": [], "date_done": "2022-01-27T03:31:00.009184", "task_id": "fb6a1b12-ad84-4590-9bbb-4412694f5a86"}"

127.0.0.1:6379> get celery-task-meta-b750444f-a66d-4fd3-8179-68712bc21f81
"{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2022-01-27T03:30:23.362510", "task_id": "b750444f-a66d-4fd3-8179-68712bc21f81"}"

非定时任务的调用测试

from celery_app.tasks import add, cheng


def run():
    ret = add.delay(2, 3)
    result, status, task_id = None, None, None

    # 获取任务结果
    # result = ret.get()
    # status = ret.status
    # task_id = ret.task_id

    return result, status, task_id


if __name__ == '__main__':
    _ret = run()
    print(_ret)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716729.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存