依赖sql表application.yml配置文件实体类MailLog公用类MailConstantsRabbitMq配置类RabbitMqConfigService层、Mapper层代码controller层发送消息到rabbitmq消息接收,发送邮件MailReceiver定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)前端邮件模板mail.html
依赖sql表org.springframework.boot spring-boot-starter-amqpcom.baomidou mybatis-plus-boot-starter3.3.1.tmp io.springfox springfox-swagger22.7.0 com.github.xiaoymin swagger-bootstrap-ui1.9.6 org.springframework.boot spring-boot-starter-mailorg.springframework.boot spring-boot-starter-data-redis
DROp TABLE IF EXISTS `t_mail_log`; CREATE TABLE `t_mail_log` ( `msgId` VARCHAR(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息id', `eid` INT(11) DEFAULT NULL COMMENT '接收员工id', `status` INT(1) DEFAULT NULL COMMENT '状态(0:消息投递中 1:投递成功 2:投递失败)', `routeKey` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '路由键', `exchange` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '交换机', `count` INT(1) DEFAULT NULL COMMENT '重试次数', `tryTime` DATETIME DEFAULT NULL COMMENT '重试时间', `createTime` DATETIME DEFAULT NULL COMMENT '创建时间', `updateTime` DATETIME DEFAULT NULL COMMENT '更新时间', UNIQUE KEY `msgId` (`msgId`) USING BTREE ) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; INSERT INTO `t_mail_log`(`msgId`,`eid`,`status`,`routeKey`,`exchange`,`count`,`tryTime`,`createTime`,`updateTime`) VALUES ('123',538,1,'mail.routing.key','mail.exchange',0,'2021-09-04 21:11:56','2021-09-04 21:10:56','2021-09-04 21:10:56');application.yml配置文件
spring: redis: #超时时间 timeout: 10000ms #服务器地址 host: 106.14.223.42 #服务器端口 port: 6379 #数据库 database: 0 mail: host: smtp.163.com username: [email protected] #QQ邮箱 password: CVSXFAKKHTSAWCZY #授权令牌 default-encoding: utf-8 #编码格式 protocol: smtp #协议 port: 25 #rabbitmq配置 rabbitmq: username: admin password: admin host: 106.14.223.42 port: 5672 #消息确认回调 publisher-/confirm/i-type: correlated #消息失败回调 publisher-returns: true #开启手动确认回调 listener: simple: acknowledge-mode: manual thymeleaf: cache: false prefix: classpath:/templates/ suffix: .html profiles: active: pro comment.avatar: /images/avatar.jpg server: port: 8099 mybatis-plus: #配置Mapper映射文件 mapper-locations: classpath*:/mapper @Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) @TableName("t_mail_log") @ApiModel(value="MailLog对象", description="") public class MailLog implements Serializable { private static final long serialVersionUID = 1L; @ApiModelProperty(value = "消息id") private String msgId; @ApiModelProperty(value = "接收员工id") private Integer eid; @ApiModelProperty(value = "状态(0:消息投递中 1:投递成功 2:投递失败)") private Integer status; @ApiModelProperty(value = "路由键") private String routeKey; @ApiModelProperty(value = "交换机") private String exchange; @ApiModelProperty(value = "重试次数") private Integer count; @ApiModelProperty(value = "重试时间") private LocalDateTime tryTime; @ApiModelProperty(value = "创建时间") private LocalDateTime createTime; @ApiModelProperty(value = "更新时间") private LocalDateTime updateTime; }公用类MailConstants
package com.ljh.po; public class MailConstants { //消息处于投递状态 public static final Integer DELIVERING = 0; //消息投递成功 public static final Integer SUCCESS = 1; //消息投递失败 public static final Integer FAILURE = 2; //最大重试次数 public static final Integer MAX_TRY_COUNT = 3; //消息超时时间 public static final Integer MSG_TIMEOUT = 1; //队列 public static final String MAIL_QUEUE_NAME = "mail.queue"; //交换机 public static final String MAIL_EXCHANGE_NAME = "mail.exchange"; //路由键 public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key"; }RabbitMq配置类RabbitMqConfig
@Configuration public class RabbitMqConfig { @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private IMailLogService mailLogService; //日志 private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConfig.class); @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause)->{ String msgId = correlationData.getId(); System.out.println("msgId:"+msgId); if (ack){ LOGGER.info("消息发送成功========》{}",msgId); mailLogService.update(new UpdateWrapperService层、Mapper层代码().set("status",1).eq("msgId",msgId)); } else { LOGGER.info("消息发送失败==========>{}",msgId); } }); rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{ LOGGER.info("{}============>消息发送到queue时失败"+msg.getBody()); }); return rabbitTemplate; } @Bean public Queue queue(){ return new Queue(MailConstants.MAIL_QUEUE_NAME); } @Bean public DirectExchange exchange(){ return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME); } }
public interface IMailLogService extends IServicecontroller层发送消息到rabbitmq{ } @Service public class MailLogServiceImpl extends ServiceImpl implements IMailLogService { } @Repository @Mapper public interface MailLogMapper extends baseMapper { }
String msgId = UUID.randomUUID().toString(); MailLog mailLog = new MailLog(); mailLog.setMsgId(msgId); mailLog.setEid(user1.getId().intValue()); mailLog.setStatus(MailConstants.DELIVERING); mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME); mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME); mailLog.setCount(0); mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)); mailLog.setCreateTime(LocalDateTime.now()); mailLog.setUpdateTime(LocalDateTime.now()); mailLogMapper.insert(mailLog); rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME ,user1,new CorrelationData(msgId));消息接收,发送邮件MailReceiver
@Component public class MailReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); @Autowired private JavaMailSender javaMailSender; @Autowired private MailProperties mailProperties; @Autowired private TemplateEngine templateEngine; @Autowired private RedisTemplate redisTemplate; @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME) public void handler(Message message, Channel channel) throws IOException { MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); //获取员工实体类 User user = (User) message.getPayload(); MessageHeaders headers = message.getHeaders(); //获取序列号 Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //获取消息id(msgId) String msgId = (String) headers.get("spring_returned_message_correlation"); //声明redis HashOperations hashOperations = redisTemplate.opsForHash(); try { //判断redis是否存在msgId,如果有,直接返回 if (hashOperations.entries("mail_log").containsKey(msgId)){ LOGGER.error("消息已经消费过了============》{}",msgId); channel.basicAck(tag,false); return; } //发件人 helper.setFrom(mailProperties.getUsername()); //邮箱地址 helper.setTo(user.getEmail()); //日期 helper.setSentDate(new Date()); //主题 helper.setSubject("入职欢迎邮件"); //邮件内容 Context context = new Context(); context.setVariable("name",user.getNickname()); String mail = templateEngine.process("mail", context); helper.setText(mail,true); //发送邮件 javaMailSender.send(msg); //加入redis hashOperations.put("mail_log",msgId,"ok"); //手动回调 channel.basicAck(tag,false); } catch (Exception e) { channel.basicNack(tag,false,true); LOGGER.error("邮件发送失败======={}",e.getMessage()); } } }定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)
@Component public class MailTask { @Autowired private IMailLogService mailLogService; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private loginRepository loginRepository; @Scheduled(cron = "0/10 * * * * ?")//10s执行一次 public void mailtask(){ //首先查询状态为正在发送中的消息而且重试时间小于当前时间 List前端邮件模板mail.htmllist = mailLogService.list(new QueryWrapper () .eq("status", 0) .lt("tryTime", LocalDateTime.now())); //如果重试次数大于3次不在重试设置为发送失败状态 list.forEach(mailLog -> { if (mailLog.getCount()>=3){ mailLogService.update(new UpdateWrapper () .set("status",2) .eq("msgId",mailLog.getMsgId())); } //更新重试次数,更新时间,重试时间 mailLogService.update(new UpdateWrapper ().set("count",mailLog.getCount()+1) .set("updateTime",LocalDateTime.now()) .set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)) .eq("msgId",mailLog.getMsgId())); //获取需要发送的消息信息 User user = loginRepository.getOne(mailLog.getEid().longValue()); //System.out.println("===============>定时task获取到的user"+user); //发送消息 rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME, user,new CorrelationData(mailLog.getMsgId())); }); } }
欢迎邮件 欢迎 ,成功注册在线笔记系统
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)