- 消息代理(message broker)和目的地(destination)
- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
- 队列(queue):点对点消息通信(point-to-point)
- 主题(topic):发布(publish)
@Test
void createExchange() {
// public DirectExchange(String name, 交换机名称
// boolean durable, 是否持久化
// boolean autoDelete, 是否自动删除
// Map
arguments) 参数 DirectExchange directExchange = new DirectExchange("test_exchange",false,false,null); amqpAdmin.declareExchange(directExchange); } } 5.4.2 创建队列 @Test void createQueue() { // public Queue(String name, 队列名称 // boolean durable, 是否持久化 // boolean exclusive, 是否是排他队列(只能被一个consumer的连接占用) // boolean autoDelete, 是否自动删除 // @Nullable Map
5.4.3 创建绑定arguments) 参数 Queue queue = new Queue("test_queue",true,false,false); amqpAdmin.declareQueue(queue); } @Test void createBinding() { // public Binding(String destination 【目的地,队列name或 交换机name(如果作为路由的话)】 // Binding.DestinationType destinationType, 【目的地类型 queue还是exchange(路由)】 // String exchange, 【交换机】 // String routingKey, 【路由键】 // @Nullable Map
5.4.4 测试以上步骤是否有问题 5.4.5 发送消息arguments) 【自定义参数】 Binding binding = new Binding("test_queue", Binding.DestinationType.QUEUE,"test_exchange","test.binding",null); amqpAdmin.declareBinding(binding); } 如果消息内容是一个对象,那么该对象必须实现Serializable接口,因为convertAndSend()默认使用的jdk序列化
@Test void sendMessage() { // public void convertAndSend(String exchange, 交换机 // String routingKey, 路由键 // Object message, 发送的消息 // MessagePostProcessor messagePostProcessor) 消息序列化器 OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setName("sendMessageTest"); orderReturnReasonEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("test_exchange","test.binding",orderReturnReasonEntity); log.info("消息发送完成"); }
5.4.5.1 使用json序列化对象gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitConfig.java
package site.zhourui.gulimall.order.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
再次发送消息测试
@Test void sendMessage() { // public void convertAndSend(String exchange, 交换机 // String routingKey, 路由键 // Object message, 发送的消息 // MessagePostProcessor messagePostProcessor) 消息序列化器 OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setName("sendMessageTest2"); orderReturnReasonEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("test_exchange","test.binding",orderReturnReasonEntity); log.info("消息发送完成"); }
测试结果
gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java
主启动类上需要加上@EnableRabbit
在需要监听消息的方法上加上@RabbitListener,并指明监听队列名称
@RabbitListener(queues = "test_queue") void receiveMessage(Object msg) { log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass()); }
监听结果
收到消息内容:(Body:'{"id":1,"name":"sendMessageTest2","sort":null,"status":null,"createTime":1639536120687}' MessageProperties [headers={__TypeId__=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.binding, deliveryTag=2, consumerTag=amq.ctag-OKodkqdQj7sbMy6xgVex5A, consumerQueue=test_queue])==>类型:class org.springframework.amqp.core.Message
5.4.6.2 接收消息内容并反序列化对象上一步接收到消息还需要手动封装为对应对象,只需要将序列化的类放在message后就可以自动封装为对应对象(消息接收对象必须与消息发送对象一致)
Message对象可以拿到消息的所有信息
@RabbitListener(queues = "test_queue") void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) { byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity); }
监听结果
收到消息内容:(Body:'{"id":1,"name":"sendMessageTest2","sort":null,"status":null,"createTime":1639539092604}' MessageProperties [headers={__TypeId__=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.binding, deliveryTag=2, consumerTag=amq.ctag-58WYA7sSHI64OWjRLJpc9w, consumerQueue=test_queue])==>内容OrderReturnReasonEntity(id=1, name=sendMessageTest2, sort=null, status=null, createTime=Wed Dec 15 11:31:32 CST 2021)
5.4.6.3 完整写法参数类型
1、Mcssage message:原生消息详细信息。头+体
2、T<发送的消息的类型>orderReturnReasonEntity content;3、Channel channel:当前传输数据的通道
3、Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息场景:
1)、订单服务启动多个;同一个消息,只能有一个客户端收到
2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java
@RabbitListener(queues = "test_queue") void receiveMessage3(Message msg, OrderReturnReasonEntity orderReturnReasonEntity, Channel channel) { log.info("信道:"+channel); byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity); }
5.4.6.4 验证多个消费者监听场景新建一个测试消息发送controller
gulimall-order/src/main/java/site/zhourui/gulimall/order/controller/TestSendMessageController.java
调用接口:http://localhost:9000/sendMessage?num=5发送多条消息
package site.zhourui.gulimall.order.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import site.zhourui.common.utils.PageUtils; import site.zhourui.common.utils.R; import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity; import java.util.Date; import java.util.Map; @RestController public class TestSendMessageController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/sendMessage") public R sendMany(@RequestParam("num") Integer num){ for (int i = 0; i
复制一个订单模块,并启动
调用消息发送接口发送10条消息
http://localhost:9000/sendMessage?num=10
结论:每个客户端都是接收的同一个队列,但是没有重复的消息
并且在此期间只创建了两个连接,确认了一个客户端只有一个连接的说法
@RabbitListener(queues={“hello-java-queue”})放在类上【作用:用来指定接收哪个队列的消息】
5.4.7.1 @RabbitListener与@RabbitHandler区别
@RabbitHandler:标在方法上【作用:重载处理不同类型的数据】- 作用域:
- RabbitListener可以标在类上也可以标在方法上
- RabbitHandler只能标在方法上
修改发送消息的接口,使其能够发送不同类型数据的消息
gulimall-order/src/main/java/site/zhourui/gulimall/order/controller/TestSendMessageController.java
package site.zhourui.gulimall.order.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import site.zhourui.common.utils.PageUtils; import site.zhourui.common.utils.R; import site.zhourui.gulimall.order.entity.OrderEntity; import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity; import java.util.Date; import java.util.Map; import java.util.UUID; @RestController public class TestSendMessageController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/sendMessage") public R sendMany(@RequestParam("num") Integer num){ for (int i = 0; i
gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java
package site.zhourui.gulimall.order.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import site.zhourui.gulimall.order.entity.OrderEntity; import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity; @Service @Slf4j @RabbitListener(queues = "test_queue") public class TestListener { // @RabbitListener(queues = "test_queue") // void receiveMessage(Object msg) { // log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass()); // } // @RabbitListener(queues = "test_queue") // void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) { // byte[] body = msg.getBody(); // MessageProperties messageProperties = msg.getMessageProperties(); // log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity); // } // @RabbitListener(queues = "test_queue") @RabbitHandler void receiveMessage3(Message msg, OrderReturnReasonEntity orderReturnReasonEntity, Channel channel) { // log.info("信道:"+channel); // byte[] body = msg.getBody(); // MessageProperties messageProperties = msg.getMessageProperties(); log.info("==>内容"+orderReturnReasonEntity); } @RabbitHandler void receiveMessage4(Message msg, OrderEntity orderEntity, Channel channel) { // log.info("信道:"+channel); // byte[] body = msg.getBody(); // MessageProperties messageProperties = msg.getMessageProperties(); log.info("==>内容"+orderEntity); } }
发送消息测试http://localhost:9000/sendMessage?num=10
同一个队列拿出不同类型的数据,并且可以封装为对应的对象,做不同的处理,只使用RabbitListener是达不到这个效果的,
但是如果接收消息与发送消息拿到的对象是自己将json封装的对象那么就这个功能用处就不大了
- 为什么不使用事务消息:保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback 确认模式(消息是否到达Broker消息代理)
- publisher returnCallback 未投递到 queue 退回模式(只要消息没有投递给指定的队列,就触发这个失败回调)
- consumer ack机制 (ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。)
5.5.2 ConfirmCallback (确认模式:发送端确认)- spring.rabbitmq.publisher-confirms=true
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
CorrelationData:用来表示当前消息唯一性。 - 消息只要被 broker 接收到就会执行 /confirm/iCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 /confirm/iCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
开启配置
rabbitmq: host: 192.168.157.128 port: 5672 virtual-host: / #开启发送端确认 publisher-/confirm/is: true
自定义RabbitTemplate,开启确认模式
package site.zhourui.gulimall.order.config; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { //设置确认回调 rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> { System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); }
5.5.3 returnCallback(回退模式:发送端确认)- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
- 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
开启配置
server: port: 9000 spring: datasource: username: root password: root url: jdbc:mysql://192.168.157.128:3306/gulimall_oms?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai driver-class-name: com.mysql.cj.jdbc.Driver cloud: nacos: discovery: server-addr: 127.0.0.1:8848 rabbitmq: host: 192.168.157.128 port: 5672 virtual-host: / #开启发送端确认 publisher-/confirm/is: true # 开启发送端消息抵达Queue确认 publisher-returns: true # 只要消息抵达Queue,就会异步发送优先回调returnfirm template: mandatory: true
自定义RabbitTemplate,开启回退模式
gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitConfig.java
package site.zhourui.gulimall.order.config; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { //设置确认回调 rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> { System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); } }
因为该模式需要消息代理投递给指定队列失败才会触发,所以此处我们故意将路由键写错,此次测试后将错误改回来
错误消息
Fail Message[(Body:’{“id”:1,“name”:“消息—0”,“sort”:null,“status”:null,“createTime”:1639576478075}’ MessageProperties [headers={TypeId=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]>replyCode[312]>replyText[NO_ROUTE]>exchange[test_exchange]>routingKey[test.binding222]
/confirm/i…correlationData[null]>ack:[true]>cause:[null]在消息发送的时候CorrelationData参数就是消息的唯一id
这个id在发送端确认时是可以拿到的,排查那些消息未成功抵达是可以用来排查(与接收到的存放在数据库的消息唯一id进行对比)
- 消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack用于肯定确认;broker将移除此消息
- basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject用于否定确认;同上,但不能批量
- 默认自动ack,消息被消费者收到,就会从broker的queue中移除
- queue无消费者,消息依然会被存储,直到消费者消费
- 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
目前存在的问题:
默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,此时会出现一个问题:
在接收消息这里打上断点,将第一个消息走完,这时模拟突发状况,比如服务器宕机,关掉服务器
关掉服务器后剩余的4个消息也都消失了,也没有经过消费端确认,相当于丢失了
为了解决这个问题,我们将自动ack确认改为手动确认,只有手动ack确认的消息才能被队列移除或者再次放入队列
开启手动ack机制
server: port: 9000 spring: datasource: username: root password: root url: jdbc:mysql://192.168.157.128:3306/gulimall_oms?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai driver-class-name: com.mysql.cj.jdbc.Driver cloud: nacos: discovery: server-addr: 127.0.0.1:8848 rabbitmq: host: 192.168.157.128 port: 5672 virtual-host: / #开启发送端确认 publisher-/confirm/is: true # 开启发送端消息抵达Queue确认 publisher-returns: true # 只要消息抵达Queue,就会异步发送优先回调returnfirm template: mandatory: true # 使用手动ack确认模式,关闭自动确认【消息丢失】 listener: simple: acknowledge-mode: manual
发送五个消息测试,就算客户端已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked
此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked
除非手动确认
package site.zhourui.gulimall.order.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import site.zhourui.gulimall.order.entity.OrderEntity; import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity; import java.io.IOException; @Service @Slf4j @RabbitListener(queues = "test_queue") public class TestListener { // @RabbitListener(queues = "test_queue") // void receiveMessage(Object msg) { // log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass()); // } // @RabbitListener(queues = "test_queue") // void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) { // byte[] body = msg.getBody(); // MessageProperties messageProperties = msg.getMessageProperties(); // log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity); // } // @RabbitListener(queues = "test_queue") @RabbitHandler void receiveMessage3(Message msg, OrderReturnReasonEntity orderReturnReasonEntity, Channel channel) { System.out.println("接收到消息:---"+orderReturnReasonEntity); byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); log.info("==>处理完成消息"+orderReturnReasonEntity.getName()); long deliveryTag = messageProperties.getDeliveryTag(); // public void basicNack(long deliveryTag, //channel内按顺序自增 // boolean multiple) //是否批量确认 try { //debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到 if(deliveryTag%2==0){ channel.basicAck(deliveryTag,false); //手动ack确认接收消息 System.out.println("签收了货物---"+deliveryTag); } } catch (IOException e) { e.printStackTrace(); } } // @RabbitHandler // void receiveMessage4(Message msg, // OrderEntity orderEntity, // Channel channel) { log.info("信道:"+channel); byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); // log.info("==>内容"+orderEntity); // } }
断点测试一个一个的放消息手动确认消息
debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到,这里测试可以放开断点,
此时只签收了货物—2,货物—4
还有三个未确认
关闭客户服务端,消息状态由unacked->ready
剩余的消息仍然可以继续签收
channel内按顺序自增
个人理解:相当于channel信道中消息的唯一id
上面为什么重启了就能再接收一个消息了?
之前的发送的五个消息,只接受偶数项消息(2,4),还剩3条消息,客户端重启后剩余消息在channel中又重新排序(1,2,3),随意再次接收到一个消息
5.5.4.3 退货package site.zhourui.gulimall.order.listener; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import site.zhourui.gulimall.order.entity.OrderEntity; import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity; import java.io.IOException; @Service @Slf4j @RabbitListener(queues = "test_queue") public class TestListener { // @RabbitListener(queues = "test_queue") // void receiveMessage(Object msg) { // log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass()); // } // @RabbitListener(queues = "test_queue") // void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) { // byte[] body = msg.getBody(); // MessageProperties messageProperties = msg.getMessageProperties(); // log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity); // } // @RabbitListener(queues = "test_queue") @RabbitHandler void receiveMessage3(Message msg, OrderReturnReasonEntity orderReturnReasonEntity, Channel channel) { System.out.println("接收到消息:---"+orderReturnReasonEntity); byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); log.info("==>处理完成消息"+orderReturnReasonEntity.getName()); long deliveryTag = messageProperties.getDeliveryTag(); // public void basicNack(long deliveryTag, //channel内按顺序自增 // boolean multiple) //是否批量确认 try { //debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到 if(deliveryTag%2==0){ channel.basicAck(deliveryTag,false); //手动ack确认接收消息 System.out.println("签收了货物---"+deliveryTag); }else { // public void basicNack(long deliveryTag, //channel内按顺序自增 // boolean multiple, //是否批量退货 // boolean requeue) //确认后是否重新入队 false丢弃 channel.basicNack(deliveryTag,false,false); //手动ack确认拒绝消息 // channel.basicReject(); System.out.println("拒接签收了货物---"+deliveryTag); } } catch (IOException e) { e.printStackTrace(); } } // @RabbitHandler // void receiveMessage4(Message msg, // OrderEntity orderEntity, // Channel channel) { log.info("信道:"+channel); byte[] body = msg.getBody(); MessageProperties messageProperties = msg.getMessageProperties(); // log.info("==>内容"+orderEntity); // } }
清除之前测试的消息,再次发送消息测试,
因为这里的requeue为false,所有的消息都被清除了
当requeue=true,deliveryTag奇数的消息被拒收的消息重新入队deliveryTag变为偶数,被接收
定时任务时效性问题
场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:spring的 schedule 定时任务轮询数据库
缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差
解决:rabbitmq的消息TTL和死信Exchange结合
6.2 消息的存活时间(TTL)- 消息的TTL(Time To Live)就是消息的存活时间。
- • RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者xmessage-ttl属性来设置时间,两者是一样的效果。
- 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
- Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
- 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
- 手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
基础版
交换机与队列一对一,一台路由器路由一个队列
升级版
只需要一台交换机绑定多个队列
6.6.2 实现 6.6.2.1 创建队列,交换机,绑定技巧容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下
- 1、第一次使用队列【监听】的时候才会创建
- 2、Broker没有队列、交换机才会创建
gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitMQConfig.java
package site.zhourui.gulimall.order.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class MyRabbitMQConfig { // @RabbitHandler // public void listen(Message message){ // System.out.println("收到消息:------>"+message); // } @Bean public Queue orderDelayQueue(){ HashMap
arguments = new HashMap<>(); Queue queue = new Queue("order.delay.queue",true,false,false,arguments); return queue; } @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue",true,false,false); return queue; } @Bean public Exchange orderEventExchange(){ TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false); return topicExchange; } @Bean public Binding orderCreateOrderBinding(){ Binding binding = new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); return binding; } @Bean public Binding orderReleaseOrderBinding(){ Binding binding = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); return binding; } } 启动订单服务后
队列
交换机
绑定关系
向延时队列发送一条消息
延时队列收到一条消息
一分钟后该消息变为死信,再将该消息转发给死信队列
死信队列收到消息
使用测试代码测试
发送消息
gulimall-order/src/main/java/site/zhourui/gulimall/order/web/HelloController.java
@Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping(value = "/test/createOrder") public String createOrderTest() { //订单下单成功 OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); orderEntity.setModifyTime(new Date()); //给MQ发送消息 rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity); return "ok"; }
接收消息
gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitMQConfig.java
@RabbitListener(queues = "order.release.order.queue") public void listen(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期订单消息,准备关闭订单:------>"+orderEntity.getOrderSn()); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); }
欢迎分享,转载请注明来源:内存溢出
- 作用域:
评论列表(0条)