1. RabbitMQ 工作原理2. RabbitMQ 七种消息收发方式
2.1 代码环境2.2 消息收发
2.2.1 Hello World2.2.2 Work queues2.2.3 Publish/Subscribe
2.2.3.1 Direct2.2.3.2 Fanout2.2.3.3 Topic2.2.3.4 Header2.2.3.5 小结 2.2.4 Routing2.2.5 Topics2.2.6 RPC2.2.7 Publisher /confirm/is
1. RabbitMQ 工作原理解释说明:
- 生产者(Producer):发布消息到 RabbitMQ中的交换机(Exchange)上交换机(Exchange):和生产者建立连接并接收生产者的消息,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定队列(Queue):Exchange将消息分发到指定的 Queue,Queue和消费者进行交互。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据路由(Routes):交换机转发消息到队列的规则消费者(Consumer):监听 RabbitMQ中的 Queue中的消息
2. RabbitMQ 七种消息收发方式注意:生产者,消费者和消息中间件很多时候并不在同一机器上;同一个应用程序既可以是生产者又是可以是消费者。
RabbitMQ官网介绍了如下七种消息分发的形式,下面逐一代码实现。代码可在文章最后查看。
2.1 代码环境SpringBoot : 2.5.7、RabbitMQ:3.9.13
application.properties 中配置 RabbitMQ的基本连接信息,如下:
server.port=8888 spring.rabbitmq.host=192.168.3.157 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
在 RabbitMQ中,所有的消息生产者提交的消息都会交由 Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的 Queue中。
2.2 消息收发 2.2.1 Hello World下面所有的生产者和消费者都在同一个工程中,方便测试
消息传播图如下:
这种消息分发采用的是默认的Exchange,在RabbitMQ的Web客户端中,可以看到RabbitMQ提供的交换机
定义队列:
@Configuration public class RabbitMQConfig { // 队列的名称 public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name"; @Bean Queue queue() { return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false); } }
消费者定义:
@Slf4j @Component public class ScorpiosConsumer { @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME) public void consume(String msg) { log.info("消费者收到的消息为:{}",msg); } }
消息发送:模拟发送请求来向RabbiMQ发送消息
@RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息,向RabbitMQ发送消息..."); rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios...."); log.info("-----------------------------------"); return "success"; } }
测试结果:浏览器输入:http://localhost:8888/send/message,看控制台日志输入
在上面的代码中,并没有创建Exchange,而是使用默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上,例如消息队列名为 scorpios_queue_name,则 routingkey为 scorpios_queue_name”的消息会被该消息队列接收。
正如官网介绍这种方式的一样:The simplest thing that does something,这种方式最为简单
2.2.2 Work queues说明:如果此处的生产者在另外一个工程中,只需要把Controller和RabbitMQConfig复制过去,就可以了。
这种方式的主要考虑的是:消息队列的消息如何被消费者消费
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。
消费者也可以配置各自的并发能力,进而提高消息的消费能力,消费者也可以配置手动 ack,来决定是否要消费某一条消息
先看并发能力的配置,如下:
@Slf4j @Component public class ScorpiosConsumer { @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME) public void consumeOne(String msg) { log.info("consumeOne消费者收到的消息为:{}",msg); } // 表示此消费者会创建5个线程来执行 @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5") public void consumeTwo(String msg) { log.info("consumeTwo消费者收到的消息为:{}",msg); } }
第二个消费者我配置了 concurrency为 5,此时,对于第二个消费者,将会同时存在5 个子线程去消费消息
启动项目,在 RabbitMQ后台也可以看到一共有 6个消费者。一个连接,具有6个Channel
此时,如果生产者发送 5条消息,就会一下都被消费掉
消息发送方式如下:
@Slf4j @RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息,向RabbitMQ发送消息..."); for (int i = 0; i < 5; i++) { rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios...." + i); } log.info("-----------------------------------"); return "success"; } }
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
可以看到,消息都被第二个消费者消费了。但需要注意,多试几次可以看到,消息也有可能被第一个消费者消费
下面来看一下,消费者开启手动 ack,这样可以自行决定是否消费 RabbitMQ发来的消息,配置手动 ack需要在配置文件中添加如下配置:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费代码如下:
@Slf4j @Component public class ScorpiosConsumer { @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME) public void consumeOne(Message message, Channel channel) throws IOException { log.info("consumeOne消费者收到的消息为:{}",message.getPayload()); // 确认消费消息 channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true); } @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5") public void consumeTwo(Message message, Channel channel) throws IOException { log.info("consumeTwo消费者收到的消息为:{},消费线程为:{}", message.getPayload(), Thread.currentThread().getName()); // 拒绝消费消息 channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true); } }
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息
2.2.3 Publish/Subscribe这种方式主要考虑的是:消息到达交换机后,如何转到消息队列
一个生产者(Producer),一个交换机(Exchange),多个消费者(Consumer),每一个消费者都有自己的一个队列(Queue)
生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的
需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ中 Exchange不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
这里交换机有四种选择,分别是:
Direct(直接)Fanout(扇出)Topic(主题)Header(标题) 2.2.3.1 Direct
DirectExchange的路由策略是将消息队列(Queue)绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上。
配置类:
@Configuration public class RabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 队列的名称 public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name"; @Bean DirectExchange directExchange(){ return new DirectExchange(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue queue() { return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false); } // 将队列与DirectExchange绑定,要指定routingkey @Bean Binding binding() { return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); } }
Binding其实是Exchange和Queue之间的桥梁,它告诉我们Exchange和哪个Queue进行了绑定关系
消费者:
@Slf4j @Component public class DirectConsumer { @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME) public void consume(String msg) { log.info("consume消费者收到的消息为:{}",msg); } }
发送者:
@Slf4j @RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息"); // 要添加routingkey参数 rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"direct","hello scorpios..."); return "success"; } }
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
在配置类中要把DirectExchange和Queue进行binding,并且要指定routingkey
同时,在发送者的代码中,要指定交换机的名字和routingkey
结合下面这张图再次理解下,交换机的类型为DirectExchange:
只有routingkey为orange时,消息才会转到Q1队列routingkey为black、green时,消息才会转到Q2队列
2.2.3.2 FanoutFanoutExchange的数据交换策略是把所有到达 FanoutExchange的消息转发给所有与它绑定的 Queue上,在这种策略中,routingkey将不起任何作用,FanoutExchange配置方式如下:
@Configuration public class FanoutRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 队列的名称 public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_one"; public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_two"; @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue queueOne() { return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false); } @Bean Queue queueTwo() { return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false); } // 将队列与FanoutExchange绑定 @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
上面创建 FanoutExchange,参数含义与创建 DirectExchange参数含义一致,然后创建两个 Queue,再将这两个 Queue都绑定到 FanoutExchange上。接下来创建两个消费者,如下:
@Slf4j @Component public class FanoutConsumer { @RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE) public void consumeOne(String msg) { log.info("consumeOne消费者收到的消息为:{}",msg); } @RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO) public void consumeTwo(String msg) { log.info("consumeTwo消费者收到的消息为:{}",msg); } }
发送者:
@Slf4j @RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息"); // routingkey 参数为null rabbitTemplate.convertAndSend(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,"hello scorpios... FanoutExchange"); return "success"; }} }
注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
FanoutExchange 交换机是将接收到的所有消息广播到它知道的所有队列中
来看下面这张图:
如果Exchange的绑定类型是Direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是Direct,但是它表现的就和Fanout有点类似了,就跟广播差不多,如上图所示。
2.2.3.3 TopicTopicExchange是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange中,Queue通过 routingkey绑定到 TopicExchange上,当消息到达 TopicExchange后,TopicExchange 根据消息的routingkey 将消息路由到一个或者多个Queue `上。
TopicExchange配置如下:
@Configuration public class TopicsRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 队列的名称 public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_xiaomi"; public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_huawei"; public static final String SCORPIOS_QUEUE_NAME_THREE = "scorpios_queue_name_phone"; @Bean TopicExchange topicExchange(){ return new TopicExchange(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue xiaomi() { return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false); } @Bean Queue huawei() { return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false); } @Bean Queue phone() { return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE,true,false,false); } // 将队列与TopicExchange绑定 @Bean Binding bindingXiaomi() { return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#"); } @Bean Binding bindingHuawei() { return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding bindingPhone() { return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); } }
创建 一个TopicExchange,三个 Queue,第一个 Queue用来存储和 xiaomi 有关的消息,第二个 Queue用来存储和 huawei 有关的消息,第三个 Queue 用来存储和 phone 有关的消息将三个 Queue分别绑定到 TopicExchange上,
第一个 Binding中的 xiaomi.# 表示消息的 routingkey凡是以 xiaomi开头的,都将被路由到名称为 xiaomi 的 Queue上第二个 Binding中的 huawei.# 表示消息的 routingkey凡是以 huawei开头的,都将被路由到名称为 huawei 的 Queue上第三个 Binding中的 #.phone.# 则表示消息的 routingkey中凡是包含 phone的,都将被路由到名称为 phone 的 Queue上
接下来针对三个 Queue创建三个消费者,如下:
@Slf4j @Component public class TopicsConsumer { @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE) public void consumeXiaomi(String msg) { log.info("consumeXiaomi消费者收到的消息为:{},匹配routingkey:xiaomi.#",msg); } @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO) public void consumeHuawei(String msg) { log.info("consumeHuawei消费者收到的消息为:{},匹配routingkey:huawei.#",msg); } @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE) public void consumePhone(String msg) { log.info("consumePhone消费者收到的消息为:{},匹配routingkey:#.phone.#",msg); } }
发送者:
@Slf4j @RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息"); rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.news","小米新闻,xiao.news"); rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.news","华为新闻,huawei.news"); rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.phone","小米手机,xiaomi.phone"); rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.phone","华为手机,huawei.phone"); rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"phone.news","手机新闻,phone.news"); return "success"; } }
根据 TopicsRabbitMQConfig中的配置,测试结果应该如下:
第一条消息将被路由到名称为 xiaomi 的 Queue上第二条消息将被路由到名为huawei 的 Queue上第三条消息将被路由到名为 xiaomi 以及名为 phone的 Queue上第四条消息将被路由到名为 huawei 以及名为 phone的 Queue上第五条消息则将被路由到名为 phone的 Queue上
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
2.2.3.4 HeaderHeadersExchange是一种使用较少的路由策略,HeadersExchange会根据消息的 Header将消息路由到不同的 Queue上,这种策略也和 routingkey无关,HeadersExchange配置如下:
@Configuration public class HeaderRabbitMQConfig { // 交换机的名称 public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name"; // 队列的名称 public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_name"; public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_age"; @Bean HeadersExchange headersExchange(){ return new HeadersExchange(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false); } @Bean Queue queueName() { return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false); } @Bean Queue queueAge() { return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false); } // 将队列与HeadersExchange绑定 @Bean Binding bindingName() { Mapmap = new HashMap<>(); map.put("name", "scorpios"); return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() { return BindingBuilder.bind(queueAge()).to(headersExchange()).whereAny("age").exist(); } }
这里主要关注下 Binding的配置上,第一个 bindingName方法中,whereAny表示消息的 Header中只要有一个 Header匹配上 map中的 key/value,就把该消息路由到名为 scorpios_queue_name_name的 Queue上,这里也可以使用 whereAll方法,表示消息的所有 Header都要匹配。whereAny和 whereAll实际上对应了一个名为 x-match 的属性。bindingAge中的配置则表示只要消息的 Header中包含 age,不管 age的值是多少,都将消息路由到名为 scorpios_queue_name_age的 Queue上
消费者:
@Slf4j @Component public class HeaderConsumer { @RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE) public void consumeName(String msg) { log.info("consumeName消费者收到的消息为:{}",msg); } @RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO) public void consumeAge(String msg) { log.info("consumeAge消费者收到的消息为:{}",msg); } }
发送者:
@Slf4j @RestController public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send/message") public String test(){ log.info("接收到客户端消息"); Message name = MessageBuilder.withBody("header exchange, scorpios_queue_name_name".getBytes()) .setHeader("name", "scorpios").build(); Message age = MessageBuilder.withBody("header exchange, scorpios_queue_name_age".getBytes()) .setHeader("age", "20").build(); rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,name); rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,age); return "success"; } }
创建并发送两条消息,两条消息具有不同的 header,不同 header的消息将被发到不同的 Queue中去
测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入
2.2.3.5 小结DirectExchange、TopicExchange都需要用到routingkeyFanoutExchange、HeadersExchange不需要routingkeyDirectExchange类型交换机,如果它绑定的多个队列的key如果都相同,虽然绑定类型是Direct,但是它表现和FanoutExchange有点类似,和广播差不多 2.2.4 Routing
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange后,根据 RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey即可,看下图理解下:
这种方式就是按照 routing key 去路由消息,可以参考上面DirectExchange、TopicExchange使用
2.2.5 Topics一个生产者,一个交换机,两个队列,两个消费者,生产者创建 TopicExchange并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写使用通配符,看下图理解下:
这种方式就是按照 routing key 去路由消息,可以参考上面TopicExchange使用
2.2.6 RPCRabbitMQ提供了RPC功能,原理图如下:
原理解释:
Client发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字Server从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to指定的回调队列中Client从回调队列中读取消息,就可以知道消息的执行结果
具体示例,下一篇实现~
2.2.7 Publisher Confirms在解决消息可靠性的问题时,有两种方式:事务和消息确认。
对于消息是否被成功消费,可以使用这种方式——消息确认机制。消息确认分为:自动确认和手动确认。
在上面的代码中,大部分都使用了自动确认。除了在介绍Work Queues方式时,消费者开启了手动 ack
这种方式很重要,后续单独研究吧~~
代码地址:https://github.com/Hofanking/springboot-rabbitmq-example
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)