如何使用Celery和Docker处理Django中的定期任务

如何使用Celery和Docker处理Django中的定期任务,第1张

如何使用Celery和Docker处理Django中的定期任务

本文主要介绍在Django中使用芹菜和Docker处理常规任务的方法。本文通过示例代码非常详细的为您介绍,对您的学习或工作有一定的参考价值,有需要的朋友可以参考一下。

在构建和扩展Django应用程序时,不可避免地会定期在后台自动运行某些任务。

一些例子:

生成定期报告

清除缓存

发送批量电子邮件通知

每晚执行维护工作。

这是构建和扩展并非Django核心的Web应用程序所需的少数功能之一。幸运的是,芹菜提供了一个强大的解决方案,它非常容易实现,并被称为芹菜节拍。

在下面的文章中,我们将向您展示如何使用Docker来设置Django、Celery和Redis,以便可以通过CeleryBeat定期运行自定义的Django管理命令。

依赖性:

Djangov3.0.5

文档编号v19.03.8

python3.8.2版

芹菜4.4.1版

Redisv5.0.8

姜戈+芹菜系列:

姜戈和芹菜的异步任务

使用芹菜和Docker处理Django中的常规任务(本文!)

目标

学完本教程后,您应该能够:

集装箱姜戈,芹菜和Redis与码头

将Celery集成到Django应用程序中并创建任务。

编写自定义Django管理命令

安排定制的Django管理命令通过CeleryBeat定期运行。

项目设置

从django-celery-beat存储库中克隆基本项目,然后签出基本分支:

$gitclone
https://github.com/testdrivenio/django-celery-beat
-branchbase-single-branch
$CDdjango-celery-beat

由于我们总共需要管理四个进程(Django、Redis、worker和Scheduler),我们将使用Docker通过连接它们来简化它们的工作流,这样它们都可以通过一个命令从终端窗口运行。

从项目根目录创建一个映像,并启动Docker容器:

$docker-composeup-d--build $docker-composeexecwebpythonmanage.pymigrate

构建完成后,导航到http://localhost:1337以确保应用程序可以按预期运行。您应该会看到以下文本:

订单
未找到订单!

项目结构:

├──.gitignore
├──docker-compose.yml
└──项目
├──dockerfile
├──核心
│├──__init__。py
│├──asgi.py
│├──settings.py
│├──URLs.py
│└──wsgi.py
├──entrypoint.sh
├──manage.py
├──订单
│├──__init__py
│├──admin.py
│├──apps.py
│├──移民
│├──0001_initial.py
│└──__init__。py
[/h/]│├──模特.py
│├──测试.py
│├──URLs.py
│└──观点.py
├──要求.txt
└──模板
└──订单
└──订单_列表.html

芹菜和雷迪斯

现在,我们需要为芹菜、芹菜泥和Redis添加容器。

首先,将依赖项添加到requirements.txt文件中:

Django==3.0.5 celery==4.4.1 redis==3.4.1

Docker-compose.yml文件内容:

redis: image:redis:alpine celery: build:./project command:celery-Acoreworker-linfo volumes: -./project/:/usr/src/app/ environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis celery-beat: build:./project command:celery-Acorebeat-linfo volumes: -./project/:/usr/src/app/ environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis

我们还需要更新Web服务的depends_on部分:

web: build:./project command:pythonmanage.pyrunserver0.0.0.0:8000 volumes: -./project/:/usr/src/app/ ports: -1337:8000 environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis#NEW

完整的docker-compose文件如下所示:

version:'3.7' services: web: build:./project command:pythonmanage.pyrunserver0.0.0.0:8000 volumes: -./project/:/usr/src/app/ ports: -1337:8000 environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis redis: image:redis:alpine celery: build:./project command:celery-Acoreworker-linfo volumes: -./project/:/usr/src/app/ environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis celery-beat: build:./project command:celery-Acorebeat-linfo volumes: -./project/:/usr/src/app/ environment: -DEBUG=1 -SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m -DJANGO_ALLOWED_HOSTS=localhost127.0.0.1[::1] depends_on: -redis

在构建新容器之前,我们需要在Django应用程序中配置Celery。

芹菜配置

设置

在“core”目录中,创建一个celery.py文件,并添加以下代码:

importos fromceleryimportCelery os.environ.setdefault("DJANGO_SETTINGS_MODULE","core.settings") app=Celery("core") app.config_from_object("django.conf:settings",namespace="CELERY") app.autodiscover_tasks()

这里发生了什么?

首先,我们为DJANGO_SETTINGS_MODULE环境变量设置一个默认值,以便Celery知道如何找到DJANGO项目。

接下来,我们创建了一个名为core的新Celery实例,并将值赋给名为app的变量。

然后,我们从django.conf的settings对象加载celery配置值,我们使用namespace="CELERY"来防止与其他django设置冲突。换句话说,芹菜的所有配置设置都必须以芹菜_为前缀。

最后,app.autodiscover_tasks()告诉Celery从设置中定义的应用程序中查找Celery任务。已安装的应用程序

将以下代码添加到core/__init__。py:

from.celeryimportappascelery_app __all__=("celery_app",)

最后,使用以下Celery设置更新core/settings.py文件,以便它可以连接到Redis:

CELERY_BROKER_URL="redis://redis:6379" CELERY_RESULT_BACKEND="redis://redis:6379"

构建:

$docker-composeup-d--build

查看日志:

$docker-composelogs'web' $docker-composelogs'celery' $docker-composelogs'celery-beat' $docker-composelogs'redis'

如果一切顺利,我们现在有四个容器,每个容器提供不同的服务。

现在,我们准备创建一个示例任务,看看它是否能正常工作。

创建任务

创建一个新文件core/tasks.py,并为仅打印到控制台的示例任务添加以下代码:

fromceleryimportshared_task @shared_task defsample_task(): print("Thesampletaskjustran.")

安排任务

在settings.py文件的末尾,添加以下代码,使用CeleryBeat将sample_task安排为每分钟运行一次:

CELERY_BEAT_SCHEDULE={ "sample_task":{ "task":"core.tasks.sample_task", "schedule":crontab(minute="*/1"), }, }

这里,我们使用CELERY_BEAT_SCHEDULE设置定义一个周期性任务。我们将任务命名为sample_task,然后声明了两个设置:

Task声明要运行的任务。

时间表设置任务运行的时间间隔。这可以是整数、时间增量或crontab。我们在任务中使用了crontab模式,并告诉它每分钟运行一次。你可以在这里找到更多关于芹菜时间表的信息。

确保添加导入:

fromcelery.schedulesimportcrontab importcore.tasks

重新启动容器并应用更改:

$docker-composeup-d--build

查看日志:

$docker-composelogs-f'celery' celery_1|--------------[queues] celery_1|.>celeryexchange=celery(direct)key=celery celery_1| celery_1| celery_1|[tasks] celery_1|.core.tasks.sample_task

我们可以看到芹菜得到了样本任务core.tasks.sample_task。

每一分钟,您都应该在日志中看到一行以“Sampletaskjustrunning”结尾的内容:

celery_1|[2020-04-1522:49:00,003:INFO/mainprocess]
Receivedtask:core.tasks.sample_task[8E5a84f-c54b-4e41-945b-645765E7b20a]
celery_1|[2020-04-1522:49:00,007:WARNING/ForkPoolWorker-1]

自定义Django管理命令

Django提供了许多内置的django-admin命令,例如:

移动

启动一个项目

文件路径

转储数据

移民

除了内置命令之外,Django还为我们提供了创建自定义命令的选项:

自定义命令对于运行独立脚本或从UNIXcrontab或Windows计划任务控制面板定期执行的脚本特别有用。

因此,我们将首先配置一个新命令,然后使用CeleryBeat自动运行它。

首先,创建一个名为orders/management/commands/my_custom_command.py的新文件。

fromdjango.core.management.baseimportBaseCommand,CommandError classCommand(BaseCommand): help="Adescriptionofthecommand" defhandle(self,*args,**options): pass

BaseCommand有一些可以被覆盖的方法,但是唯一需要的方法是handle。Handle是自定义命令的入口点。换句话说,这个方法将在我们运行命令时被调用。

出于测试目的,我们通常只添加一个快速打印语句。但是,根据Django文档,建议使用stdout.write:

当您使用管理命令并希望提供控制台输出时,您应该编写self.stdout和self.stderr,而不是直接打印它们。使用这些代理可以更容易地测试定制命令。还要注意,您不需要用换行符来结束消息,它会自动添加,除非您指定了end参数。

因此,添加self.stdout.write命令:

fromdjango.core.management.baseimportBaseCommand,CommandError classCommand(BaseCommand): help="Adescriptionofthecommand" defhandle(self,*args,**options): self.stdout.write("Mysamplecommandjustran.")#NEW

测试:

$docker-composeexecwebpythonmanage.pymy_custom_command Mysamplecommandjustran.

这样的话,让我们把所有的东西捆绑在一起!

使用芹菜节拍计划自定义命令

现在我们已经启动并运行了容器,它已经过测试,我们可以安排任务定期运行,并且我们已经编写了定制的DjangoAdmin示例命令。现在是时候设置定期运行自定义命令了。

设置

在这个项目中,我们有一个非常基本的应用程序,叫做Order。它包含两个模型,产品和订单。让我们创建一个自定义命令,发送一个电子邮件报告,确认当天的订单。

首先,我们将通过这个项目中包含的设备向数据库添加一些产品和订单:

$docker-composeexecwebpythonmanage.pyloaddataproducts.json

创建超级用户:

$docker-composeexecwebpythonmanage.pycreatesuperuser

出现提示时,请填写您的用户名、电子邮件和密码。然后在Web浏览器中导航到http://127.0.0.1:1337/admin。使用刚刚创建的超级用户登录,创建几个订单。确保至少有一个日期是今天。

让我们为电子邮件报告创建一个新的自定义命令。

创建一个名为orders/management/commands/email_report.py的文件:

fromdatetimeimporttimedelta,time,datetime fromdjango.core.mailimportmail_admins fromdjango.core.managementimportBaseCommand fromdjango.utilsimporttimezone fromdjango.utils.timezoneimportmake_aware fromorders.modelsimportOrder today=timezone.now() tomorrow=today+timedelta(1) today_start=make_aware(datetime.combine(today,time())) today_end=make_aware(datetime.combine(tomorrow,time())) classCommand(BaseCommand): help="SendToday'sOrdersReporttoAdmins" defhandle(self,*args,**options): orders=Order.objects.filter(confirmed_date__range=(today_start,today_end)) iforders: message="" fororderinorders: message+=f"{order}\n" subject=( f"OrderReportfor{today_start.strftime('%Y-%m-%d')}" f"to{today_end.strftime('%Y-%m-%d')}" ) mail_admins(subject=subject,message=message,html_message=None) self.stdout.write("E-mailReportwassent.") else: self.stdout.write("Noordersconfirmedtoday.")

在代码中,我们在数据库中查询日期为Confirmed_date的订单,将订单合并成电子邮件正文中的一条消息,然后使用Django内置的mail_admins命令将电子邮件发送给管理员。

添加虚拟管理员电子邮件,并将EMAIL_BACKEND设置为使用控制台后端,以便可以将电子邮件发送到设置文件中的stdout:

EMAIL_BACKEND="django.core.mail.backends.console.EmailBackend" DEFAULT_FROM_EMAIL="noreply@email.com" ADMINS=[("testuser","test.user@email.com"),]

运行:

$docker-composeexecwebpythonmanage.pyemail_report
Content-Type:text/plain;charset="utf-8"
MIME-Version:1.0
Content-Transfer-Encoding:7bit
Subject:[Django]2020-04-15至2020-04-16
订单报告发件人:root@localhost
发件人:test.user@email.com
1586992245.495663971825@5ce6313185d3>;

订单:337ef21c-5f53-4761-9f81-07945de385AE-产品:大米

-
邮件报告已发送。

芹菜搅打

现在,我们需要创建一个常规任务来每天运行这个命令。

向core/tasks.py添加新任务:

fromceleryimportshared_task fromdjango.core.managementimportcall_command#NEW @shared_task defsample_task(): print("Thesampletaskjustran.") #NEW @shared_task defsend_email_report(): call_command("email_report",)

所以,首先,我们添加了一个call_command导入,用于以编程方式调用django-admin命令。在新任务中,使用call_command作为参数,并附带自定义命令的名称。

要计划此任务,请打开core/settings.py文件,并更新CELERY_BEAT_SCHEDULE设置以包含新任务。

CELERY_BEAT_SCHEDULE={ "sample_task":{ "task":"core.tasks.sample_task", "schedule":crontab(minute="*/1"), }, "send_email_report":{ "task":"core.tasks.send_email_report", "schedule":crontab(hour="*/1"), }, }

在这里,我们向CELERY_BEAT_SCHEDULE添加了一个名为send_email_report的新条目。正如我们对上一个任务所做的那样,我们声明了该任务应该运行的任务——例如,core.tasks.send_email_report——并使用crontab模式设置可重复性。

重新启动容器以确保新设置处于活动状态:

$docker-composeup-d--build 看日志: $docker-composelogs-f'celery' celery_1|--------------[queues] celery_1|.>celeryexchange=celery(direct)key=celery celery_1| celery_1| celery_1|[tasks] celery_1|.core.tasks.sample_task celery_1|.core.tasks.send_email_report

一分钟后,邮件发出去了:

celery_1|[2020-04-1523:20:00,309:WARNING/ForkPoolWorker-1]内容-类型:text/plain;charset="utf-8"
celery_1|MIME-Version:1.0
celery_1|Content-Transfer-Encoding:7bit
celery_1|Subject:[Django]2020-04-15至2020-04-16
celery_1|From:root@localhost
celery_115869928000.12.42422500683251@42481c198b77>;
芹菜_1|
芹菜_1|订单:337ef21c-5f53-4761-9f81-07945de385AE-产品:大米
芹菜_1|[2020-04-1523:20:00,310:WARNING/forkpoolworker-1]-
芹菜_1|。

结论

在本文中,我们将指导您为芹菜、芹菜泥和Redis设置Docker容器。然后,我们展示了如何使用CeleryBeat创建一个定制的Django管理命令和一个定期任务来自动运行该命令。

原文:https://testdriven.io/blog/django-celery-periodic-tasks/

关于如何使用芹菜和Docker来处理Django的常规任务的这篇文章到此为止。关于CeleryDocker如何处理Django的常规任务的更多信息,请搜索我们以前的文章或继续浏览下面的相关文章。希望大家以后能多多支持我们!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/774536.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-03
下一篇 2022-05-03

发表评论

登录后才能评论

评论列表(0条)

保存