一、导入maven依赖,我使用的版本和parent的版本一致2.3.12.RELEASE
org.springframework.boot spring-boot-starter-amqp
二、开启/confirm/iCallback、ReturnCallback及手动ack配置
spring: rabbitmq: host: 101.xxx.xx.xx port: 5672 username: admin password: admin #开启发送到交换机确认callback publisher-/confirm/i-type: correlated #开启发送到队列失败returnCallback publisher-returns: true #消息是否强制回退 如果此值为空才取publisher-returns值 template: mandatory: true #开启手动确认消息 listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual
三、新建类继承/confirm/iCallbak、ReturnCallback,具体可见如下demo
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import java.util.UUID; @Service @Slf4j public class SendWithConfirmService implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if (ack) { log.info("消息已发送到交换器 cause:{} - {}" , s , correlationData.toString()); } else { log.info("消息未发送到交换器 cause:{} - {}" , s , correlationData.toString()); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { log.info("消息被退回 {}" , message.toString()); } @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Transactional(rollbackFor = Exception.class) public void send(String exchange,String routingKey,String data) { //注意如果需要confirmCallBack 需要传CorrelationData CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData); } }
四、手动确认消费消息和未消费消息,channel.basicAck、channel.basicNack使用及参数详解如下
@RabbitListener(queues = "TestDireQueue3") public void receive3(String data, Message message,Channel channel) { log.info("TestDireQueue3 receive message================={}",data); try { //第一个参数是消息的index(可以理解为消息的唯一id) //第二个参数是是否开启批量模式,true-一次ack所有消息该消息index的消息,提高效率 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); //未被消费 //前两个参数和basicAck一样 //第三个参数为 是否重新回到队列 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); log.info("TestDireQueue3中{}已被消费",data); } catch (IOException e) { log.error("error",e); } }
五、需要注意的一些地方及问题
1、如果开启了/confirm/iCallback、returnCallback可一在回调的方法做些额外处理,例如结合数据库进一步保证消息100%投递,或者重发,提示等等 *** 作,但是需要注意的是高并发处理消息的效率会降低
2、配置如果开启了手动ack,所有的消息都需要手动确认,不然消息会一直存在队列中只是状态有所变化为unack,会被不断重新消费,所以一定要调用channel.basicAck方法
3、消费消息不要抛出异常,异常中断也不会正常消费消息,会导致消息死循环一直重复消费
4、小概率出现问题,特别是刚接触时,注意Channel的引用包import com.rabbitmq.client.Channel,因为有很多Channel的包,引错的话会报错org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.nio.channels.Channel] for GenericMessage……,注意查看报错信息也是可以发现的,是因为引用了java.nio.channels.Channel
后续记录创建队列时其它的属性参数设置,例如设置消息的有效持续时间即TTL,死信队列,变相的延迟队列等等
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)