首先要清楚rabbitmq基础知识
生产者:生产消息
消费者:消费监听指定队列的消息
配置类:将交换机队列绑定到一起
引入依赖:
org.springframework.boot spring-boot-starter-amqpcn.hutool hutool-all5.4.5 org.projectlombok lombokprovided 1.18.16
创建生产者:
@Component @Slf4j public class SendUserIntegralRabbitMqProducer { @Resource private RabbitTemplate rabbitTemplate; public void send(SendUserIntegralDTO dto) { try { rabbitTemplate.convertAndSend(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL, MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL, JSONUtil.toJsonStr(dto)); } catch (Exception e) { log.error("----------------------发送用户获取积分消息到 RabbitMQ 异常:", e); } } }
使用生产者:
if (CommonConstant.TWO.equals(query.getIsPass())) { SendUserIntegralDTO dto = new SendUserIntegralDTO(); dto.setIntegral(examineIntegralPO.getGrantIntegral()); dto.setExamineId(examineIntegralPO.getId()); sendUserIntegralRabbitMqProducer.send(dto); }
消费者代码:
@Slf4j @Component public class UserIntegralRabbitMqConsumer { @Resource private IntegralService integralService; @RabbitListener(queues = {MqConstant.CHECK_QUEUE_USER_INTEGRAL}) //将生产者绑定到队列上 public void checkUserIntegrallistener(Message message, Channel channel) { String jsonStr = JSONUtil.toJsonStr(new String(message.getBody())); try { SendUserIntegralDTO dto = JSONUtil.toBean(jsonStr, SendUserIntegralDTO.class); integralService.addUserIntegral(dto); } catch (BizException e) { log.error("------------调整用户积分业务异常:", e); } catch (Exception e) { log.error("------------调整用户积分未捕获异常:", e); } finally { // 手动进行应答 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error("------------用户获取积分消费者 ack 失败:", e); } } } }
配置类:
@Configuration @Slf4j public class RabbitMqConfig { @Bean(MqConstant.CHECK_QUEUE_USER_INTEGRAL) public Queue checkQueueUserIntegral(){ return new Queue(MqConstant.CHECK_QUEUE_USER_INTEGRAL); } @Bean public Binding bindingTopicByCheckUserIntegral( @Qualifier(MqConstant.CHECK_QUEUE_USER_INTEGRAL) Queue queue, @Qualifier(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL).noargs(); } @Bean(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL) public Exchange exchangeTopicByCheckUserIntegral(){ return ExchangeBuilder.topicExchange(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL).durable(true).build(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)