博客首页:崇尚学技术的科班人
小肖来了
今天给大家带来的文章是《RabbitMQ发布确认和交换机基础总结与实战》
这是RabbitMQ的发布确认和交换机基础总结与实战
希望各位小伙伴们能够耐心的读完这篇文章
博主也在学习阶段,如若发现问题,请告知,非常感谢
同时也非常感谢各位小伙伴们的支持
源码地址
文章目录- 1、发布确认
- 1.1、发布确认的引出
- 1.2、发布确认的策略
- 1.2.1、开启发布确认的方法
- 1.2.2、单个确认发布
- 1.2.3、批量确认发布
- 1.2.4、异步确认发布
- 1.2.5、如何处理异步未确认消息
- 1.2.6、以上3种发布确认的速度对比
- 2、交换机
- 2.1、Exchanges
- 2.1.1、概念
- 2.1.2、类型
- 2.1.3、无名exchange
- 2.2、临时队列
- 2.3、绑定(binding)
- 2.4、Fanout(发布订阅模式)
- 2.4.1、介绍
- 2.4.2、实战
- 2.5、Direct(路由模式)
- 2.5.1、介绍
- 2.5.2、实战
- 2.6、Topic
- 2.6.1、介绍
- 2.6.2、Topic的要求
- 2.6.3、Topic的匹配案例
- 2.6.4、实战
一个消息的持久化需要经历的步骤:
- 设置要求队列持久化。
- 设置要求队列中的消息必须持久化。
- 发布确认
- 如果缺少了发布确认的话,那么消息在磁盘上持久化之前会发生丢失,从而不能满足消息持久化的目的。
Channel channel = RabbitmqUtil.getChannel(); //开启发布确认 channel./confirm/iSelect();
- 发布确认默认是没有开启的,如果需要开启需要调用/confirm/iSelect,每当需要使用发布确认的时候,都需要调用该方法。
- 单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
- 该确认方式主要通过waitFor/confirm/is方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。
- 这种确认方式的最大的缺点就是:发布速度特别慢。
public static void /confirm/iMessageIndividually() throws Exception{ Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 进行单个发布确认 boolean flag = channel.waitFor/confirm/is(); if(flag){ System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); }1.2.3、批量确认发布
- 先发布一批消息然后一起确认。
- 缺点:当发生故障导致发布出现问题时,不知道那个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息。
public static void /confirm/iMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); // 批量处理消息的个数 int batchSize = 100; for (int i = 1; i <= MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 进行批量发布确认 if(i % batchSize == 0){ channel.waitFor/confirm/is(); System.out.println("批量处理消息成功"); } } long end = System.currentTimeMillis(); System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); }1.2.4、异步确认发布
原理
- 异步确认发布是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
代码
public static void /confirm/iMessageAsync() throws Exception{ Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); /confirm/iCallback ackCallback = (var1,var2)->{ System.out.println("已确认的消息" + var1); }; /confirm/iCallback nackCallback = (var1,var2)->{ System.out.println("未确认的消息" + var1); }; channel.add/confirm/iListener(ackCallback,nackCallback); for (int i = 1; i <= MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); }1.2.5、如何处理异步未确认消息
- 最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说ConcurrentSkipListMap
public static void /confirm/iMessageAsync() throws Exception{ Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); ConcurrentSkipListMap1.2.6、以上3种发布确认的速度对比map = new ConcurrentSkipListMap<>(); /confirm/iCallback ackCallback = (var1,var2)->{ if(var2){ ConcurrentNavigableMap longStringConcurrentNavigableMap = map.headMap(var1); longStringConcurrentNavigableMap.clear(); }else{ map.remove(var1); } String message = map.get(var1); System.out.println("已确认的消息是:" + message + " 已确认的消息tag:" + var1); }; /confirm/iCallback nackCallback = (var1,var2)->{ // 未确认的消息 String s = map.get(var1); System.out.println(s); System.out.println("未确认的消息" + var1); }; channel.add/confirm/iListener(ackCallback,nackCallback); long begin = System.currentTimeMillis(); for (int i = 1; i <= MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 1. 将消息保存到一个线程安全地队列中 map.put(channel.getNextPublishSeqNo(),message); } long end = System.currentTimeMillis(); System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); }
- 单个发布确认:同步等待确认,简单,但吞吐量非常有限。
- 批量确认发布:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪一条消息出现了问题。
- 异步确认发布:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iCallback; import com.xiao.utils.RabbitmqUtil; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeoutException; public class /confirm/iMessage { public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception { // 单个发布确认 /confirm/iMessageIndividually(); // 单个确认发送1000条消息所消耗的时间是680ms // 批量发布确认 /confirm/iMessageBatch(); //批量确认发送1000条消息所消耗的时间是112ms //异步发布确认 /confirm/iMessageAsync(); // 异步确认发送1000条消息所消耗的时间是41ms // 异步确认发送1000条消息所消耗的时间是33ms } public static void /confirm/iMessageIndividually() throws Exception{ Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 进行单个发布确认 boolean flag = channel.waitFor/confirm/is(); if(flag){ System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); } public static void /confirm/iMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); // 批量处理消息的个数 int batchSize = 100; for (int i = 1; i <= MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 进行批量发布确认 if(i % batchSize == 0){ channel.waitFor/confirm/is(); System.out.println("批量处理消息成功"); } } long end = System.currentTimeMillis(); System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); } public static void /confirm/iMessageAsync() throws Exception{ Channel channel = RabbitmqUtil.getChannel(); String QUEUE_NAME = UUID.randomUUID().toString(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel./confirm/iSelect(); ConcurrentSkipListMap2、交换机map = new ConcurrentSkipListMap<>(); /confirm/iCallback ackCallback = (var1,var2)->{ if(var2){ ConcurrentNavigableMap longStringConcurrentNavigableMap = map.headMap(var1); longStringConcurrentNavigableMap.clear(); }else{ map.remove(var1); } String message = map.get(var1); System.out.println("已确认的消息是:" + message + " 已确认的消息tag:" + var1); }; /confirm/iCallback nackCallback = (var1,var2)->{ // 未确认的消息 String s = map.get(var1); System.out.println(s); System.out.println("未确认的消息" + var1); }; channel.add/confirm/iListener(ackCallback,nackCallback); long begin = System.currentTimeMillis(); for (int i = 1; i <= MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 1. 将消息保存到一个线程安全地队列中 map.put(channel.getNextPublishSeqNo(),message); } long end = System.currentTimeMillis(); System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息所消耗的时间是" + (end - begin) + "ms"); } }
- 在这一部分中,我们将做一些完全不同的事情 – 我们将消息传达给多个消费者。这种模式成为“发布/订阅模式”。这里需要使用到交换机。
- RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。
- 相反,生产者只能将消息发送到交换机。
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
- 我们在发送消息时,""表示的就是默认的无名队列。
- 消息能路由发送到队列中其实是由routingKey绑定key指定的。
- 临时队列:一个具有随即名称的队列。一旦我们断开了连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();2.3、绑定(binding)
- 绑定其实是交换机和队列之间的桥梁。它能够标识哪个交换机和哪个队列进行了绑定关系。
-
它是将接收到的所有消息广播到它知道的所有队列中。
-
系统有默认的的Fanout交换机类型
- 交换机转发一条消息,其所绑定的所有队列都可以接收到消息。
1. 两个消费者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 声明一个临时队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接收消息"); DeliverCallback deliverCallback = (var1,var2)->{ System.out.println("ReceiveLogs01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume(queueName,true,deliverCallback,var1->{}); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogs02 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 声明一个临时队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接收消息"); DeliverCallback deliverCallback = (var1,var2)->{ System.out.println("ReceiveLogs02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume(queueName,true,deliverCallback,var1->{}); } }
2. 一个生产者
import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.util.Scanner; public class EmitLog { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); Scanner sc = new Scanner(System.in); while(sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8")); System.out.println("成功发送消息:" + message); } } }
3. 测试结果
生产者
消费者
- 所以在fanout模式下,所有队列都可以收到消息。
- 直接交换机和fanout交换机的差别在于RoutingKey的绑定上,它绑定的的多个队列的key一般是不同的,如果是相同的,那么它表现得就和fanout有点类似。
队列和交换机的绑定关系
1. 生产者
import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.util.Scanner; public class DirectLogs { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); Scanner sc = new Scanner(System.in); while(sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8")); System.out.println("成功发送消息:" + message); } } }
2. 两个消费者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 声明队列 channel.queueDeclare("console",false,false,false,null); // 进行绑定 channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); System.out.println("等待接收消息"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("ReceiveLogsDirect01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume("console",true,deliverCallback,var1->{}); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 声明队列 channel.queueDeclare("disk",false,false,false,null); // 进行绑定 channel.queueBind("disk",EXCHANGE_NAME,"error"); System.out.println("等待接收消息"); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("ReceiveLogsDirect02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); }; channel.basicConsume("disk",true,deliverCallback,var1->{}); } }
3. 测试结果
- 当生产者进行发送消息的时候,它会优先进行RoutingKey的比较,然后才会发送给相应的队列。
之前我们Fanout可以将所有消息发送到所有队列,direct可以将消息发送到某个队列。但我们假设我们当前有3个队列,我们想只发送消息到其中的两个队列,那么这就需要Topic。
2.6.2、Topic的要求- topic的RoutingKey不能随意写,它必须是一个单词列表,以点号隔开。
- *(星号)可以代替一个单词。
- #(井号)可以代替零个或多个单词
- quick.orange.rabbit :被队列Q1、Q2接收到
- lazy.orange.elephant:被队列Q1、Q2接收到
- quick.orange.fox:被队列Q1接收到
- lazy.brown.fox:被队列Q2接收到
- lazy.pink.rabbit:虽然满足两个绑定但只被队列Q2接收一次。
- quick.brown.fox:不匹配任何绑定不会被任何队列接收到会被丢弃。
- quick.orange.male.rabbit:是四个单词不匹配任何绑定会被丢弃
- lazy.orange.male.rabbit:是四个单词当匹配Q2
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了。
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了。
1. 两个消费者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q1"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接受消息....."); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("ReceiveLogsTopic01控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); System.out.println("接收队列:" + queueName + " 接受的键:" + var2.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true,deliverCallback,var1->{}); } }
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xiao.utils.RabbitmqUtil; public class ReceiveLogsTopic02 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitmqUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q2"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#"); System.out.println("等待接受消息....."); DeliverCallback deliverCallback = (var1, var2)->{ System.out.println("ReceiveLogsTopic02控制台接收到的消息是:" + new String(var2.getBody(),"UTF-8")); System.out.println("接收队列:" + queueName + " 接受的键:" + var2.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true,deliverCallback,var1->{}); } }
2. 生产者
import com.rabbitmq.client.Channel; import com.xiao.utils.RabbitmqUtil; import java.util.HashMap; import java.util.Map; public class EmitLogTopic { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitmqUtil.getChannel(); Mapmap = new HashMap<>(); map.put("quick.orange.rabbit","被队列Q1、Q2接收到"); map.put("lazy.orange.elephant","被队列Q1、Q2接收到"); map.put("quick.orange.fox","被队列Q1接收到"); map.put("lazy.brown.fox","被队列Q2接收到"); map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次。"); map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃。"); map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); map.put("lazy.orange.male.rabbit","是四个单词当匹配Q2"); for(String key : map.keySet()){ String message = map.get(key); channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8")); System.out.println("发送的消息是:" + message); } } }
3. 测试结果
1. 生产者
2. 消费者
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)