1.首先我们简单介绍一下RabbitMQ中生产者(provider)、交换机(exchange)、队列(queue)、消费者(consumer)的关系,如下图所示。
当消息抵达RabbitMQ代理的时候,它会进入为其设置的exchange上。exchange负责将它路由到一个或多个队列中,这个过程会根据exchange的类型、exchange和队列之间的bingding以及消息的routing key进行路由。
exchange的类型如下
- Default:这是代理创建的特殊exchange。它会将消息路由至名字与消息routing key相同的队列,所有的队列都会自动绑定至Default Exchange
- Direct:如果消息的routing key与队列的bingding key相同,那么消息将会路由到该队列上
- Fanout:不管routing key和binding key是什么,消息都将会路由到所有绑定队列上
- Headers:与Topic Exchange类似,只不过要基于消息的头信息进行路由,而不是routingkey
- Dead letter:捕获所有无法投递(也就是它们无法匹配所有已定义的Exchange和队列的binding关系)的消息
- Topic:如果消息的routing key与队列binding key(可能会包含通配符)匹配,那么消息将会路由到一个或多个这样的队列上,绑定规则如下:
*(星号)用来表示一个单词(必须出现的)
#(井号)用来表示任意数量(零个或多个)单词
例子:
队列Q1绑定键为 *.AA.* 队列Q2绑定键为 AA.#
消息1携带的路由键为 A.AA.B,那么队列Q1将会收到
消息2携带的路由键为AA.BB.CC,那么队列Q2将会收到
交换机的介绍就到此,下面开始撸代码:
1.首先我们创建两个springboot项目,一个rabbitmq-provider(生产者),一个rabbitmq-consumer(消费者)。
2.添加pom文件用到的依赖:
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-web
3.配置rabbitmq信息
server: port: 8088 spring: application: name: rabbitmq-provider #项目名称 rabbitmq: host: 127.0.0.1 #代理主机(默认为localhost) port: 5672 #代理端口(默认为5672) username: guest #访问代理所使用的用户名(可选) password: guest #访问代理所使用的密码(可选)
4.使用Driect Exchangech创建DirectConfig.java
@Configuration public class DirectConfig { @Bean public Queue testDirectQueue(){ return new Queue("TestDirectQueue",true); } @Bean DirectExchange testDirectExchange() { return new DirectExchange("TestDirectExchange",true,false); } @Bean Binding bindingDirect() { return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("TestDirectRouting"); } }
5.创建一个controller用于发送消息
@RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendDirectMessage") public String sendDirectMessage(){ Mapmap = new HashMap<>(); map.put("ID",System.currentTimeMillis()); map.put("message","测试"); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "success"; } }
然后运行项目,访问sendDirectMessage接口,访问成功过后我们去rabbitmq的后台管理界面上就可以看到这个队列了
6.现在我们创建rabbitmq-consumer项目,pom文件添加依赖:
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-web
7.配置rabbitmq信息
server: port: 8089 spring: application: name: rabbitmq-provider #项目名称 rabbitmq: host: 127.0.0.1 #代理主机(默认为localhost) port: 5672 #代理端口(默认为5672) username: guest #访问代理所使用的用户名(可选) password: guest #访问代理所使用的密码(可选)
8.创建消息接收监听类,DirectReceiver.java
@Component @RabbitListener(queues = "TestDirectQueue") public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString()); } }
启动项目,控制台可以看到:
然后我们在去rabbitmq管理界面查看,刚刚那条队列就被消费了
1.使用Topic Exchange创建TopicConfig.java
@Configuration public class TopicConfig { @Bean public Queue firstTopicQueue() { return new Queue("topicQueue.first"); } @Bean public Queue secondTopicQueue() { return new Queue("topicQueue.second"); } @Bean public Queue thirdlyTopicQueue() { return new Queue("topicQueue.thirdly.test"); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstTopicQueue和topicExchange绑定,而且绑定的键值为topicQueue.first //这样只要是消息携带的路由键是topicQueue.first,才会分发到该队列 @Bean Binding bindingExchange() { return BindingBuilder.bind(firstTopicQueue()).to(exchange()).with("topicQueue.first"); } //将secondTopicQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.* // 这样只要是消息携带的路由键是以topicQueue.*(任意字符),都会分发到该队列 @Bean Binding bindingExchange2() { return BindingBuilder.bind(secondTopicQueue()).to(exchange()).with("topicQueue.*"); } //将thirdlyTopicQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topicQueue.开头的,都会分发到该队列 @Bean Binding bindingExchange3() { return BindingBuilder.bind(thirdlyTopicQueue()).to(exchange()).with("topicQueue.#"); } }
2.创建推送消息接口
@GetMapping("/sendTopicMessage") public String sendTopicMessage(){ Mapmap = new HashMap<>(); map.put("ID",System.currentTimeMillis()); map.put("message","Topic1测试"); rabbitTemplate.convertAndSend("topicExchange", "topicQueue.first", map); return "success"; } @GetMapping("/sendTopicMessage2") public String sendTopicMessage2(){ Map map = new HashMap<>(); map.put("ID",System.currentTimeMillis()); map.put("message","Topic2测试"); rabbitTemplate.convertAndSend("topicExchange", "topicQueue.second", map); return "success"; } @GetMapping("/sendTopicMessage3") public String sendTopicMessage3(){ Map map = new HashMap<>(); map.put("ID",System.currentTimeMillis()); map.put("message","Topic3测试"); rabbitTemplate.convertAndSend("topicExchange", "topicQueue.thirdly.test", map); return "success"; }
3.启动项目 调用sendTopicMessage接口,去rabbitmq后台管理界面可以看到如下三个队列都有消息 回到TopicConfig.java配置文件可以看到三routingKey分别是 topicQueue.first、topicQueue.、topicQueue.#
所以第一个接口的routingKey为topicQueue.first时 所有的队列都会接收到消息
调用第二个接口sendTopicMessage2,看rabbitmq的管理界面,topicQueue.second队列和topicQueue.thirdly.test队列消息数变成了2,回到TopicConfig.java可以发现只有topicQueue. 和 topicQueue.# 符合sendTopicMessage2接口的routingKey:topicQueue.second
调用第三个接口sendTopicMessage3,看rabbitmq的管理界面,
topicQueue.thirdly.test队列的消息数变成了3,回到TopicConfig.java可以发现只有 topicQueue.# 符合sendTopicMessage3接口的routingKey:topicQueue.thirdly.test,这里也进一步用实例讲述了 *(星号) 和 #(井号)的区别
4.分别创建TopicReceiverFirst.java、TopicReceiverSecond.java、TopicReceiverThirdly.java 三个接收器 分别接收topicQueue.first、topicQueue.second、topicQueue.thirdly.test队列的消息
@Component @RabbitListener(queues = "topicQueue.first") public class TopicReceiverFirst { @RabbitHandler public void process(Map testMessage) { System.out.println("topicQueue.first消费者收到消息 : " + testMessage.toString()); } }
@Component @RabbitListener(queues = "topicQueue.second") public class TopicReceiverSecond { @RabbitHandler public void process(Map testMessage) { System.out.println("topicQueue.second消费者收到消息 : " + testMessage.toString()); } }
@Component @RabbitListener(queues = "topicQueue.thirdly.test") public class TopicReceiverThirdly { @RabbitHandler public void process(Map testMessage) { System.out.println("topicQueue.thirdly.test消费者收到消息 : " + testMessage.toString()); } }
5.运行项目控制台可以看到TopicReceiverFirst消费了1条消息,TopicReceiverSecond消费了2条消息,TopicReceiverThirdly消费了3条消息,跟之前发送到队列的消息数是完全对应的
1.使用Fanout Exchange创建FanoutConfig.java
@Configuration public class FanoutConfig { @Bean public Queue fanoutQueueA(){ return new Queue("fanoutQueue.A"); } @Bean public Queue fanoutQueueB(){ return new Queue("fanoutQueue.B"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange()); } }
2.创建推送消息接口
@GetMapping("/sendFanoutMessage") public String sendFanoutMessage(){ Mapmap = new HashMap<>(); map.put("ID",System.currentTimeMillis()); map.put("message","Fanout测试"); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "success"; }
3.启动项目调用sendFanoutMessage接口,然后查看rabbitmq后台界面可以发现fanoutQueue.A和fanoutQueue.B队列都收到了消息
4.创建FanoutReceiverA和FanoutReceiverB收听器
@Component @RabbitListener(queues = "fanoutQueue.A") public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("fanoutQueue.A消费者收到消息 : " + testMessage.toString()); } }
@Component @RabbitListener(queues = "fanoutQueue.B") public class FanoutReceiverB { @RabbitHandler public void process(Map testMessage) { System.out.println("fanoutQueue.B消费者收到消息 : " + testMessage.toString()); } }
运行项目可以在控制台发现都消费类消息
好了,剩下的交换机这里就不再介绍了,本文有参考https://blog.csdn.net/qq_35387940/article/details/100514134
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)