RabbitMq实战使用

RabbitMq实战使用,第1张

RabbitMq实战使用

首先要清楚rabbitmq基础知识

生产者:生产消息

消费者:消费监听指定队列的消息

配置类:将交换机队列绑定到一起

引入依赖:


   org.springframework.boot
   spring-boot-starter-amqp




    cn.hutool
    hutool-all
    5.4.5



    org.projectlombok
    lombok
    provided
    1.18.16

创建生产者

@Component
@Slf4j
public class SendUserIntegralRabbitMqProducer {

    @Resource
    private RabbitTemplate rabbitTemplate;


    public void send(SendUserIntegralDTO dto) {
        try {
            rabbitTemplate.convertAndSend(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL,
                    MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL,
                    JSONUtil.toJsonStr(dto));
        } catch (Exception e) {
            log.error("----------------------发送用户获取积分消息到 RabbitMQ 异常:", e);
        }
    }
}

使用生产者:

 if (CommonConstant.TWO.equals(query.getIsPass())) {
                SendUserIntegralDTO dto = new SendUserIntegralDTO();
                dto.setIntegral(examineIntegralPO.getGrantIntegral());
                dto.setExamineId(examineIntegralPO.getId());
                sendUserIntegralRabbitMqProducer.send(dto);
            }

消费者代码:

@Slf4j
@Component
public class UserIntegralRabbitMqConsumer {

    @Resource
    private IntegralService integralService;

  @RabbitListener(queues = {MqConstant.CHECK_QUEUE_USER_INTEGRAL}) //将生产者绑定到队列上
    public void checkUserIntegrallistener(Message message, Channel channel) {
        String jsonStr = JSONUtil.toJsonStr(new String(message.getBody()));
        try {
            SendUserIntegralDTO dto = JSONUtil.toBean(jsonStr, SendUserIntegralDTO.class);
            integralService.addUserIntegral(dto);
        } catch (BizException e) {
            log.error("------------调整用户积分业务异常:", e);
        } catch (Exception e) {
            log.error("------------调整用户积分未捕获异常:", e);
        } finally {
            // 手动进行应答
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                log.error("------------用户获取积分消费者 ack 失败:", e);
            }
        }
    }
}

配置类:
 

@Configuration
@Slf4j
public class RabbitMqConfig {

        
    @Bean(MqConstant.CHECK_QUEUE_USER_INTEGRAL)
    public Queue checkQueueUserIntegral(){
        return new Queue(MqConstant.CHECK_QUEUE_USER_INTEGRAL);
    }

    
    @Bean
    public Binding bindingTopicByCheckUserIntegral(
            @Qualifier(MqConstant.CHECK_QUEUE_USER_INTEGRAL) Queue queue,
            @Qualifier(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(MqConstant.CHECK_ROUTING_KEY_USER_INTEGRAL).noargs();
    }

    
    @Bean(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL)
    public Exchange exchangeTopicByCheckUserIntegral(){
        return ExchangeBuilder.topicExchange(MqConstant.CHECK_EXCHANGE_TOPIC_USER_INTEGRAL).durable(true).build();
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5696079.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存