集成RabbitMq+redis发送邮件

集成RabbitMq+redis发送邮件,第1张

集成RabbitMq+redis发送邮件

文章目录

依赖sql表application.yml配置文件实体类MailLog公用类MailConstantsRabbitMq配置类RabbitMqConfigService层、Mapper层代码controller层发送消息到rabbitmq消息接收,发送邮件MailReceiver定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)前端邮件模板mail.html

依赖
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            com.baomidou
            mybatis-plus-boot-starter
            3.3.1.tmp
        
        
        
            io.springfox
            springfox-swagger2
            2.7.0
        
        
        
            com.github.xiaoymin
            swagger-bootstrap-ui
            1.9.6
        
        
        
            org.springframework.boot
            spring-boot-starter-mail
        
        
        
            org.springframework.boot
            spring-boot-starter-data-redis
        
sql表
DROp TABLE IF EXISTS `t_mail_log`;

CREATE TABLE `t_mail_log` (
  `msgId` VARCHAR(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息id',
  `eid` INT(11) DEFAULT NULL COMMENT '接收员工id',
  `status` INT(1) DEFAULT NULL COMMENT '状态(0:消息投递中 1:投递成功 2:投递失败)',
  `routeKey` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '路由键',
  `exchange` VARCHAR(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '交换机',
  `count` INT(1) DEFAULT NULL COMMENT '重试次数',
  `tryTime` DATETIME DEFAULT NULL COMMENT '重试时间',
  `createTime` DATETIME DEFAULT NULL COMMENT '创建时间',
  `updateTime` DATETIME DEFAULT NULL COMMENT '更新时间',
  UNIQUE KEY `msgId` (`msgId`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;



INSERT  INTO `t_mail_log`(`msgId`,`eid`,`status`,`routeKey`,`exchange`,`count`,`tryTime`,`createTime`,`updateTime`) VALUES 
('123',538,1,'mail.routing.key','mail.exchange',0,'2021-09-04 21:11:56','2021-09-04 21:10:56','2021-09-04 21:10:56');
application.yml配置文件
spring:
  redis:
    #超时时间
    timeout: 10000ms
    #服务器地址
    host: 106.14.223.42
    #服务器端口
    port: 6379
    #数据库
    database: 0
  mail:
    host: smtp.163.com
    username: [email protected]  #QQ邮箱
    password: CVSXFAKKHTSAWCZY #授权令牌
    default-encoding: utf-8         #编码格式
    protocol: smtp                        #协议
    port: 25
  #rabbitmq配置
  rabbitmq:
    username: admin
    password: admin
    host: 106.14.223.42
    port: 5672
    #消息确认回调
    publisher-/confirm/i-type: correlated
    #消息失败回调
    publisher-returns: true
      #开启手动确认回调
    listener:
      simple:
        acknowledge-mode: manual
  thymeleaf:
    cache: false
    prefix: classpath:/templates/
    suffix: .html
  profiles:
    active: pro
comment.avatar: /images/avatar.jpg
server:
  port: 8099

mybatis-plus:
  #配置Mapper映射文件
  mapper-locations: classpath*:/mapper
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("t_mail_log")
@ApiModel(value="MailLog对象", description="")
public class MailLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty(value = "消息id")
    private String msgId;

    @ApiModelProperty(value = "接收员工id")
    private Integer eid;

    @ApiModelProperty(value = "状态(0:消息投递中 1:投递成功 2:投递失败)")
    private Integer status;

    @ApiModelProperty(value = "路由键")
    private String routeKey;

    @ApiModelProperty(value = "交换机")
    private String exchange;

    @ApiModelProperty(value = "重试次数")
    private Integer count;

    @ApiModelProperty(value = "重试时间")
    private LocalDateTime tryTime;

    @ApiModelProperty(value = "创建时间")
    private LocalDateTime createTime;

    @ApiModelProperty(value = "更新时间")
    private LocalDateTime updateTime;


}

公用类MailConstants
package com.ljh.po;


public class MailConstants {
    //消息处于投递状态
    public static final Integer DELIVERING = 0;
    //消息投递成功
    public static final Integer SUCCESS = 1;
    //消息投递失败
    public static final Integer FAILURE = 2;
    //最大重试次数
    public static final Integer MAX_TRY_COUNT = 3;
    //消息超时时间
    public static final Integer MSG_TIMEOUT = 1;
    //队列
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    //交换机
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    //路由键
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}

RabbitMq配置类RabbitMqConfig
@Configuration
public class RabbitMqConfig {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;
    @Autowired
    private IMailLogService mailLogService;

    //日志
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConfig.class);


    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        
        rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause)->{
            String msgId = correlationData.getId();
            System.out.println("msgId:"+msgId);
            if (ack){
                LOGGER.info("消息发送成功========》{}",msgId);
                mailLogService.update(new UpdateWrapper().set("status",1).eq("msgId",msgId));

            }
            else {
                LOGGER.info("消息发送失败==========>{}",msgId);
            }
        });
        
        rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
            LOGGER.info("{}============>消息发送到queue时失败"+msg.getBody());
        });

        return rabbitTemplate;
    }

    @Bean
    public Queue queue(){
        return new Queue(MailConstants.MAIL_QUEUE_NAME);
    }

    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
    }
}

Service层、Mapper层代码
public interface IMailLogService extends IService {
}


@Service
public class MailLogServiceImpl extends ServiceImpl implements IMailLogService {
}

@Repository
@Mapper
public interface MailLogMapper extends baseMapper {

}
controller层发送消息到rabbitmq
 String msgId = UUID.randomUUID().toString();
            MailLog mailLog = new MailLog();
            mailLog.setMsgId(msgId);
            mailLog.setEid(user1.getId().intValue());
            mailLog.setStatus(MailConstants.DELIVERING);
            mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
            mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
            mailLog.setCount(0);
            mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
            mailLog.setCreateTime(LocalDateTime.now());
            mailLog.setUpdateTime(LocalDateTime.now());
            mailLogMapper.insert(mailLog);
            rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME
                    ,user1,new CorrelationData(msgId));
消息接收,发送邮件MailReceiver
@Component
public class MailReceiver {

    private static  final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JavaMailSender javaMailSender;
    @Autowired
    private MailProperties mailProperties;
    @Autowired
    private TemplateEngine templateEngine;
    @Autowired
    private RedisTemplate redisTemplate;


    @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
    public  void handler(Message message, Channel channel) throws IOException {
        MimeMessage msg = javaMailSender.createMimeMessage();
        MimeMessageHelper helper = new MimeMessageHelper(msg);

        //获取员工实体类
        User user = (User) message.getPayload();
        MessageHeaders headers = message.getHeaders();
        //获取序列号
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //获取消息id(msgId)
        String msgId = (String) headers.get("spring_returned_message_correlation");
        //声明redis
        HashOperations hashOperations = redisTemplate.opsForHash();
        try {
            //判断redis是否存在msgId,如果有,直接返回
            if (hashOperations.entries("mail_log").containsKey(msgId)){
                LOGGER.error("消息已经消费过了============》{}",msgId);
                
                channel.basicAck(tag,false);
                return;
            }
            //发件人
            helper.setFrom(mailProperties.getUsername());
            //邮箱地址
            helper.setTo(user.getEmail());
            //日期
            helper.setSentDate(new Date());
            //主题
            helper.setSubject("入职欢迎邮件");
            //邮件内容
            Context context = new Context();
            context.setVariable("name",user.getNickname());
            String mail = templateEngine.process("mail", context);
            helper.setText(mail,true);
            //发送邮件
            javaMailSender.send(msg);
            //加入redis
            hashOperations.put("mail_log",msgId,"ok");
            //手动回调
            channel.basicAck(tag,false);
        } catch (Exception e) {
            
            channel.basicNack(tag,false,true);
                LOGGER.error("邮件发送失败======={}",e.getMessage());
        }
    }

}
定时任务检查消息是否发送(记得在启动了添加@EnableScheduling)
@Component
public class MailTask {

    @Autowired
    private IMailLogService mailLogService;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private loginRepository loginRepository;

    @Scheduled(cron = "0/10 * * *  * ?")//10s执行一次
    public void mailtask(){
        //首先查询状态为正在发送中的消息而且重试时间小于当前时间
        List list = mailLogService.list(new QueryWrapper()
                .eq("status", 0)
                .lt("tryTime", LocalDateTime.now()));
        //如果重试次数大于3次不在重试设置为发送失败状态
        list.forEach(mailLog -> {
            if (mailLog.getCount()>=3){
                mailLogService.update(new UpdateWrapper()
                        .set("status",2)
                        .eq("msgId",mailLog.getMsgId()));
            }
            //更新重试次数,更新时间,重试时间
            mailLogService.update(new UpdateWrapper().set("count",mailLog.getCount()+1)
            .set("updateTime",LocalDateTime.now())
            .set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
            .eq("msgId",mailLog.getMsgId()));
            //获取需要发送的消息信息
            User user = loginRepository.getOne(mailLog.getEid().longValue());
            //System.out.println("===============>定时task获取到的user"+user);
            //发送消息
            rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
                    user,new CorrelationData(mailLog.getMsgId()));
        });

    }
}
前端邮件模板mail.html



    
    欢迎邮件


欢迎 ,成功注册在线笔记系统



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705721.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存