SpringBoot整合RabbitMQ详细应用教程(2)-发送消息确认回调及手动确认消息

SpringBoot整合RabbitMQ详细应用教程(2)-发送消息确认回调及手动确认消息,第1张

SpringBoot整合RabbitMQ详细应用教程(2)-发送消息确认回调及手动确认消息

一、导入maven依赖,我使用的版本和parent的版本一致2.3.12.RELEASE

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

 二、开启/confirm/iCallback、ReturnCallback及手动ack配置

spring:
  rabbitmq:
    host: 101.xxx.xx.xx
    port: 5672
    username: admin
    password: admin

    #开启发送到交换机确认callback
    publisher-/confirm/i-type: correlated
    #开启发送到队列失败returnCallback
    publisher-returns: true
    #消息是否强制回退 如果此值为空才取publisher-returns值
    template:
      mandatory: true
    #开启手动确认消息
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

三、新建类继承/confirm/iCallbak、ReturnCallback,具体可见如下demo

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Service
@Slf4j
public class SendWithConfirmService implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack) {
            log.info("消息已发送到交换器 cause:{} - {}" , s , correlationData.toString());
        } else {
            log.info("消息未发送到交换器 cause:{} - {}" , s , correlationData.toString());
        }
    }

    
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("消息被退回 {}" , message.toString());
    }

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Transactional(rollbackFor = Exception.class)
    public void send(String exchange,String routingKey,String data) {
        //注意如果需要confirmCallBack 需要传CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);
    }
}

四、手动确认消费消息和未消费消息,channel.basicAck、channel.basicNack使用及参数详解如下

    @RabbitListener(queues = "TestDireQueue3")
    public void receive3(String data, Message message,Channel channel) {
        log.info("TestDireQueue3 receive message================={}",data);
        try {
            //第一个参数是消息的index(可以理解为消息的唯一id)
            //第二个参数是是否开启批量模式,true-一次ack所有消息该消息index的消息,提高效率
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            
            //未被消费
            //前两个参数和basicAck一样
            //第三个参数为 是否重新回到队列
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            log.info("TestDireQueue3中{}已被消费",data);
        } catch (IOException e) {
            log.error("error",e);
        }
    }

五、需要注意的一些地方及问题

1、如果开启了/confirm/iCallback、returnCallback可一在回调的方法做些额外处理,例如结合数据库进一步保证消息100%投递,或者重发,提示等等 *** 作,但是需要注意的是高并发处理消息的效率会降低

2、配置如果开启了手动ack,所有的消息都需要手动确认,不然消息会一直存在队列中只是状态有所变化为unack,会被不断重新消费,所以一定要调用channel.basicAck方法

3、消费消息不要抛出异常,异常中断也不会正常消费消息,会导致消息死循环一直重复消费

4、小概率出现问题,特别是刚接触时,注意Channel的引用包import com.rabbitmq.client.Channel,因为有很多Channel的包,引错的话会报错org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.nio.channels.Channel] for GenericMessage……,注意查看报错信息也是可以发现的,是因为引用了java.nio.channels.Channel

后续记录创建队列时其它的属性参数设置,例如设置消息的有效持续时间即TTL,死信队列,变相的延迟队列等等

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5684453.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存