文章目录博客首页:崇尚学技术的科班人
小肖来了
今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》
有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了
希望各位小伙伴们能够耐心的读完这篇文章
博主也在学习阶段,如若发现问题,请告知,非常感谢
同时也非常感谢各位小伙伴们的支持
- 1、死信队列
- 1.1、概念
- 1.2、死信来源
- 1.3、死信实战
- 1.3.1、代码架构图
- 1.3.2、TTL过期情况
- 1.3.3、队列达到最大长度情况
- 1.3.4、消息被拒情况
- 2、总结
- 死信:就是无法被消费的消息。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
- 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到队列中)。
- 消息被拒绝并且requeue = false
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class Consumer01 { public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_QUEUE = "dead_queue"; public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); // 死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); Mapmap = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key","lisi"); map.put("x-message-ttl",10000); // 普通队列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); // 死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); // 队列绑定 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{}); } }
- 最为复杂的就是消费者01,它需要进行 死信交换机绑定死信队列、普通交换机绑定普通队列、普通队列绑定死信交换机。
- 我们为了让消息不被消费,我们需要制造假死现象,也就是关闭消费者01。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成 channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{}); } }
3. 生产者
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.nio.charset.StandardCharsets; public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 单位是毫秒 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for(int i = 1; i < 11; i ++){ String message = "info" + i; // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes()); } } }
4. 测试结果
- 所有的消息在超过过期时间之后,全部转移到了死信队列中。
1. 消费者01
public class Consumer01 { public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_QUEUE = "dead_queue"; public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); // 死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); Mapmap = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key","lisi"); map.put("x-max-length",6); //map.put("x-message-ttl",10000); // 普通队列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); // 死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); // 队列绑定 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("Consumer01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{}); } }
- 这里我们将过期时间参数改为了队列最大长度
- 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是关闭消费者01。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成 channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{}); } }
- 消费者02和TTL过期情况下的一模一样
3. 生产者
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.nio.charset.StandardCharsets; public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 单位是毫秒 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for(int i = 1; i < 11; i ++){ String message = "info" + i; // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes()); } } }
- 我们将对应的设置过期时间注释掉
4. 测试结果
- 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变。
- 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信。
1. 消费者01
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class Consumer01 { public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_QUEUE = "dead_queue"; public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); // 死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); Mapmap = new HashMap<>(); map.put("x-dead-letter-exchange",DEAD_EXCHANGE); map.put("x-dead-letter-routing-key","lisi"); //map.put("x-max-length",6); //map.put("x-message-ttl",10000); // 普通队列 channel.queueDeclare(NORMAL_QUEUE,false,false,false,map); // 死信队列 channel.queueDeclare(DEAD_QUEUE,false,false,false,null); // 队列绑定 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ String msg = new String(var2.getBody(),"UTF-8"); if(msg.equals("info5")){ System.out.println("Consumer01控制台接收到的消息是:" + msg + ": 此消息被拒" ); channel.basicReject(var2.getEnvelope().getDeliveryTag(),false); }else{ System.out.println("Consumer01控制台接收到的消息是:" + msg); channel.basicAck(var2.getEnvelope().getDeliveryTag(),false); } }; channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{}); } }
- 这里我们将队列最大长度注释掉
- 我们还需要开启手动应答,因为不开启就不会存在消息被拒 的问题。
2. 消费者02
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); System.out.println("等待接收消息......"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("Consumer02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; // 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成 channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{}); } }
- 消费者02和队列达到最大长度情况下的一模一样
3. 生产者
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.nio.charset.StandardCharsets; public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 单位是毫秒 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for(int i = 1; i < 11; i ++){ String message = "info" + i; // 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes()); } } }
- 生产者和队列达到最大长度情况下的一模一样
4. 测试结果
- 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。
- 可见只有"info5"被作为死信。
- 如果觉得这篇文章对你有帮助的话,请给我一个五星好评呗。评论地址,感谢铁汁的支持!!!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)