了解完 单队列模式 之后,接下来开始交换机模式,交换机模式一共有四种:Fanout、headers、direct、topic,这里这介绍一种常用的定向direct和topic主题模式。
在实践代码逻辑之前,尽可能先了解rabbitmq的详情比较好,rabbitmq工作流程及模式详解。
- 依赖包
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
- yml
server:
port: 8081
#一个模块中不可有两个application文件,否则另一个会失效,多模块引用的时候尤其注意
spring:
rabbitmq:
host: 192.168.25.131
port: 5672
virtual-host: /
#手动ack消息,手动确认消息才算消费
listener:
simple:
acknowledge-mode: manual
username: admin
password: 123456
direct定向模式
- 创建交换机
/**
*功能描述: 创建交换机
* @author zhouwenjie
* @date 2022/5/6 0:36
* @param
* @return org.springframework.amqp.core.Exchange
*/
@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderEventExchange(){
//durable:是否持久化 autoDelete:是否自动删除
DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
return directExchange;
}
- 声明队列
/**
* 功能描述: 【订单相关】声明正常收取队列
*
* @param
* @return org.springframework.amqp.core.Queue
* @author zhouwenjie
*/
@Bean("order_release_queue")
public Queue orderReleaseQueue() {
Queue queue = new Queue("order_release_queue", true, false, false);
return queue;
}
- 绑定交换机和队列
@Bean
public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
}
指定路由key【routingKey】,发消息的时候带上路由key才能被交换机转发到指定队列。
- 创建测试接口
/**
- @author: zhouwenjie
- @description:
- @create: 2022-05-06 01:26
**/
@RestController
@RequestMapping("order")
public class controller {
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order){
//发送消息指定交换机和路由键
rabbitTemplate.convertAndSend("order_event_exchange","order.release.order", order);
return "ok";
}
}
- 测试结果
- 创建topic模式交换机,队列还是用前边的队列即可
/**
*功能描述: 创建交换机
* @author zhouwenjie
* @date 2022/5/6 0:36
* @param
* @return org.springframework.amqp.core.Exchange
*/
@Bean("order_topic_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderTopicExchange(){
//durable:是否持久化 autoDelete:是否自动删除
TopicExchange topicExchange = new TopicExchange("order_topic_exchange",true, false);
return topicExchange;
}
- 绑定交换机队列
@Bean
public Binding orderTopicBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_topic_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.topic.*").noargs();
}
这里说明一下* 和 #的区别,匹配单个用 “*”,多个用 #;
举例说明:
a.#可以匹配a、a.b、a.b.c;
a.~ 只能匹配a.b(*打不出来,用~代替了);
- 接口测试
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order){
rabbitTemplate.convertAndSend("order_topic_exchange","order.topic", order);
return "ok";
}
注意:在实验当中,如果发现用*和#测试发现怎么配置都能收到消息,可以去管理界面解除队列和交换机绑定的路由键关系,如图所示
/**
* @author: zhouwenjie
* @description: mq交换机队列初始化
* @create: 2022-05-06 00:23
**/
@Configuration
public class MyMQConfig {
/**
*功能描述: 创建交换机
* @author zhouwenjie
* @date 2022/5/6 0:36
* @param
* @return org.springframework.amqp.core.Exchange
*/
@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderEventExchange(){
//durable:是否持久化 autoDelete:是否自动删除
DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
return directExchange;
}
/**
*功能描述: 创建延迟队列
* @author zhouwenjie
* @date 2022/5/6 0:42
* @param
* @return org.springframework.amqp.core.Queue
*/
@Bean("order_delay_queue")
public Queue oderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
/**
* x-dead-letter-exchange: order_event_exchange
* x-dead-letter-routing-key: order.release.order
* x-message-ttl: 60000
*/
//配置队列的死信应该发送给哪个交换机,过期消息发给谁
arguments.put("x-dead-letter-exchange", "order_event_exchange");
//发送给交换机使用的路由key
arguments.put("x-dead-letter-routing-key", "order.release.order");
//队列消息变成死信的时间 10s
arguments.put("x-message-ttl", 10000);
//exclusive:是否排他 arguments:扩展参数
Queue queue = new Queue("order_delay_queue", true, false, false, arguments);
return queue;
}
/**
* 功能描述: 【订单相关】声明正常收取队列
*
* @param
* @return org.springframework.amqp.core.Queue
* @author zhouwenjie
*/
@Bean("order_release_queue")
public Queue orderReleaseQueue() {
Queue queue = new Queue("order_release_queue", true, false, false);
return queue;
}
/**
*功能描述: 绑定延迟队列和交换机
* @author zhouwenjie
* @date 2022/5/6 1:38
* @return
*/
@Bean
public Binding orderCreateOrderBinging(@Qualifier("order_delay_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.delay.order").noargs();
}
@Bean
public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
}
/**
*功能描述: 创建交换机
* @author zhouwenjie
* @date 2022/5/6 0:36
* @param
* @return org.springframework.amqp.core.Exchange
*/
@Bean("order_topic_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderTopicExchange(){
//durable:是否持久化 autoDelete:是否自动删除
TopicExchange topicExchange = new TopicExchange("order_topic_exchange",true, false);
return topicExchange;
}
@Bean
public Binding orderTopicBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_topic_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.topic.#").noargs();
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)