前提:
celery RabbitMQ或Redis ,一般都选Redis。
要安装上reids,并且能够连通redis。
目录结构
1、celery_task包保存celery项目的东西。
必须要有celery.py这个文件,其内容:
from celery import Celery #导入django环境,#加载django环境, import os import django os.environ.setdefault("DJANGO_SETTINGS_MODULE","studentSystemt.settings.dev") #studentSystemt.settings.dev 项目的配置文件所在位置 django.setup() broker = 'redis://192.168.137.10/15'#broker任务队列 backend = 'redis://192.168.137.10/14'#结构存储,执行完的结果存在这里 app = Celery(__name__,broker=broker,backend=backend,include=['celery_task.emailtask','celery_task.task2'])
2、emailtask.py文件是celery的任务,里面封装了发送邮件的函数。send_email()
from .celery import app #发送邮件 @app.task def send_email(recv,code,sender="135xxxxxxx@163.com", passward='IOJBAXPFAUJSXSUZ'): from studentSystemt.utils import email #导入可以发送邮件类,并实例化一个对象 email_obj = email.Send_mail(sender=sender, passward=passward, receivers=recv) name = 'xxx邮件' text = f'您的验证码是:{code}' head = '验证码:若非本人 *** 作,请忽略此消息!!' #'IOJBAXPFAUJSXSUZ' #调用对象的发送功能,实现发送邮件 email_obj.send(Name=name, ShowText=text, Header_show=head) return code
3、email.py是实现邮件发送的类。
import smtplib from email.header import Header from email.mime.text import MIMEText class Send_mail(object): '''发送邮件''' def __init__(self,sender,passward,receivers): self.sender=sender self.password=passward self.receivers=receivers def send(self,ShowText,Name,Header_show): ''' :param ShowText: 发送内容 :param Name: 发送者 :param Header_show: 发送文件抬头 :return: ''' message = MIMEText('%s'%(ShowText), 'plain', 'utf-8') message['From'] = Header("%s"%(Name), 'utf-8') message['To'] = Header(self.receivers) message['Subject'] = Header("%s"%(Header_show),'utf-8') smtpObj=smtplib.SMTP('smtp.163.com') smtpObj.set_debuglevel(1) smtpObj.login(self.sender,self.password) smtpObj.sendmail(self.sender,self.receivers,message.as_string()) smtpObj.quit()
4、在视图函数中导入emailtask.py模块,建立异步任务
class UtilFindPasswordView(ViewSet): @action(methods=['post'],detail=False) def checkemail(self,request,*args,**kwargs): #导入emailtask模块 from celery_task import emailtask email = request.data.get('email') code = '1234' #异步发送邮件,函数名.delay(参数) emailtask.send_email.delay(recv=email,code=code) return Response()
5、启动django项目
6、启动celery:
打开terminal:切换项目根目录下: celery -A celery_task worker -l INFO -P eventlet
F:installsotfpythonstudentSystemt>celery -A celery_task worker -l INFO -P eventlet
7、看到:说明celery框架启动了
8、可以使用postman进行测试接口
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)