MQ(Message Queue)就是消息队列的意思,遵循先进先出的实现规则,其实就是一个队列而已,只不过队列中存储的是消息 (message)。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ消息中间件,负责接收和发送消息,可看做一个快递站点,负责消息的存储和转发发。
故在安装RabbitMQ之前,需要安装Erlang语言环境,使之RabbitMQ在安装后可以正常运行。
- **Publisher **
生产者,也就是发送消息的用户端。
- Exchange
交换机,接收生产者发送的消息,并将这些消息交由绑定的队列中。
- Queue
消息队列,就是存储发送给消费者的消息,可以存在一个或多个消息队列,遵循先进先出的原则。
- Consumer
消费者,就是接收处理生产者发送的消息。
- Connection
消费者/生产者与Broker之间的TCP链接关系。
-
Channel
信道,TCP中的虚拟连接,Connection只有一个,但可以生成多个Channel信道连接。
-
Binding
绑定,就是将交换机和消息队列之间继续绑定。
- Virtual Host
虚拟主机。
- Broker
代表消息队列和交换机的整体。
RabbitMQ 功能- 流量消峰
就是将用户所有的请求转接到RabbitMQ,由RabbitMQ对请求进行消峰处理,所谓消峰就是将请求进行排队,从而限制了用户对系统的访问量,防止系统访问量过大,可能出现宕机风险。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jXh2lvZr-1650705153746)(http://www.kaotop.com/file/tupian/20220425/1648472052863-28472fc4-4392-4d52-bb4f-d048a279f68c.png)]
- 应用解耦
简单理解就是在系统与系统内部的主要主要分支之间添加一个中间部件,RabbitMQ,减少系统之间的耦合度。
低耦合高内聚:就是类内部关系之间越紧密越好,类与类之间的关系越稀疏越好。
- 异步处理
异步处理,我现在当前的理解,就是同时调用多个任务时,将其交由RabbitMQ,进行异步的同时处理。
RabbitMQ四大核心功能:
生产者、消费者、交换机、队列
进入Erlang官网:https://www.erlang.org/downloads
Windows平台- 进入Erlang官网,并点击Download按钮
- 选择Windows版本
- 傻瓜式安装
- 进入Erlang官网,并点击Download按钮
- 点击下载Erlang源码文件
- 将下载好的源码文件,就是所得到的压缩包通过Xshell上传的服务器
- 在服务器端进行解压安装
#解压文件
tar zxvf ErlangFileName -C /usr/local
#进入解压后的文件夹
cd /usr/local/erlang
#文件安装
./configure --prefix /usr/local/lib64
make
make install
- 配置环境变量
#添加环境变量
echo 'export PATH=$PATH:/usr/local/erlang/bin/' >> /etc/profile
#刷新配置文件
source /etc/profile
RabbitMQ安装
Windows平台
直接打开RabbitMQ.exe的进行傻瓜式安装。
- 进入RabbitMQ下载官网
- 将下载后的压缩包上传到服务器
- 解压并安装
#解压压缩包
xz -d rabbitmq-server-generic-unix-3.9.14.tar
tar zxvf rabbitmq-server-generic-unix-3.9.14.tar.xz -C /usr/local
#重命名
mv /usr/lcoal/rabbitmq_server-3.9.14 rabbitmq
#配置环境变量
echo 'export PATH=$PATH:/usr/local/rabblitmq/sbin' >> /etc/profile
#更新配置
source /etc/profile
#启动停止rabbitmq
rabbitmq-server -detached #启动
rabbitmqctl stop #停止
#添加插件
rabbitmq-plugins enable rabbitmq_management
- 若使用的是云服务器,需要开放端口5672或15672
#添加指定需要开放的端口:
firewall-cmd --add-port=15672/tcp --permanent
#重载入添加的端口:
firewall-cmd --reload
#查询指定端口是否开启成功:
firewall-cmd --query-port=15672/tcp
用户名和密码皆为guest
RabbitMQ 基本使用 页面介绍
直接使用网页翻译了,可能会有误差,酌情查看。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x7KVZWZR-1650705153773)(http://www.kaotop.com/file/tupian/20220425/1648354800207-3abe11e5-084a-48f6-9fe6-6d3398c258c2.png)]
-
用户角色描述
- 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行 *** 作。
- 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
- 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
- 添加用户
- 设置用户权限
进入用户详情页
修改用户相关信息
- 引用先关依赖
#引用Rabbit依赖
<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbitartifactId>
dependency>
-
配置application配置文件并创建配置类
- 配置application配置文件
#RabbitMq配置
rabbitmq:
#连接地址
host: 192.168.137.1
#连接端口
port: 15672
#连接用户信息-用户名和密码
username: root
password: root
- 创建配置类
- **Broker**: 它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, - **Exchange**:消息交换机,它指定消息按什么规则,路由到哪个队列。 - **Queue**:消息的载体,每个消息都会被投到一个或多个队列。 - **Binding**:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. - **Routing **Key:路由关键字,exchange根据这个关键字进行消息投递。 - **vhost**:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 - **Producer**:消息生产者,就是投递消息的程序. - **Consumer**:消息消费者,就是接受消息的程序. - **Channel**:消息通道,在客户端的每个连接里,可建立多个channel.
- 以Fanout模式为例
在实际情况下,消息通过前端传递到controller层,通过对应的controller发送到交换机,再由交换机分发消息到队列,由消费者从队列中取出消息。
Fanout 广播模式,将消息发送到绑定交换机的所有队列。
- 创建Fanout配置文件
@Configuration
public class FanoutConfig {
/**声明交换机名称*/
private static final String EXCHANGE_NAME = "FanoutExchange";
/**声明队列名称*/
private static final String QUEUE_A = "Queue_A";
private static final String Queue_B = "Queue_B";
/**声明交换机*/
@Bean
public FanoutExchange fanoutExchange (){
/*
* 参数列表:
* 1.交换机名称
* 2.是否持久化
* 3.是否自动删除
* 4.其它参数
* */
return new FanoutExchange(EXCHANGE_NAME,false,false,null);
}
/**声明交换机A与B*/
@Bean
public Queue queue_A (){
/*
* 参数列表:
* 1.队列名称
* 2.是否持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
* */
return new Queue(QUEUE_A,false,false,false,null);
}
@Bean
public Queue queue_B (){
/*
* 参数列表:
* 1.队列名称
* 2.是否持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
* */
return new Queue(Queue_B,false,false,false,null);
}
/**绑定交换机与队列*/
@Bean
public Binding bindingQueue_A (){
/*
* 绑定队列与交换机
* */
return BindingBuilder.bind(queue_A()).to(fanoutExchange());
}
@Bean
public Binding bindingQueue_B (){
/*
* 绑定队列与交换机
* */
return BindingBuilder.bind(queue_B()).to(fanoutExchange());
}
}
- 创建生产者与消费者
@Component
//@Controller
public class FanoutController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**声明交换机名称*/
private static final String EXCHANGE_NAME = "FanoutExchange";
/**
* 发送消息
* */
@RequestMapping("/fanoutSend")
@ResponseBody
public String fanoutSend (){
//声明消息体
String message = "发送的消息体";
/*
* 参数列表:
* 1.交换机名称
* 2.routingKey
* 3.消息体
* */
rabbitTemplate.convertAndSend(EXCHANGE_NAME,"",message);
System.out.println("消息发送成功");
return "发送成功";
}
}
@Component
public class FanoutConsumer {
/**声明队列名称*/
private static final String QUEUE_A = "Queue_A";
private static final String Queue_B = "Queue_B";
//该静态信息,可以通过一个静态配置类完成
@RabbitListener(queues = QUEUE_A)
public void Queue_A(Message message){
String queueMessage = new String(message.getBody());
System.out.println("队列A接收的消息体:"+queueMessage);
}
@RabbitListener(queues = Queue_B)
public void Queue_B (Message message){
String queueMessage = new String(message.getBody());
System.out.println("队列B接收的消息体:"+queueMessage);
}
}
创建测试文件,直接调用FanoutController中的 fanoutSend方法或者通过客户端发送请求。
如果消费者在处理消息时,突然挂掉,这样可能会出现**消息缺失,**故RabbitMQ会支持消息应答机制,也就是当处理消息的消费者突然挂机,RabbitMQ会将该消息交由另一个消费者处理。当消费者将消息处理完毕,会将其告诉RabbitMQ,可以将处理好的消息删除。
在RabbitMQ中消息没有时间期限,只有在消费者处理好消息后,并传递信息告知RabbitMQ后,才会将消息删除。
开启手动应答:在生产者端代码不变,消费者端设置开启手动回应模式。
消息归队
也就是消息应答机制,确保队列中的消息不丢失。
/**
* 参数列表:
* 1.指定队列名
* 2.是否自动处理,true表示自动处理,false表示手动处理
* 3.消费者处理未成功的函数回调
* 4.消费者处理成功的函数回调
*/
channel.basicConsume(ConnectUtil.QUEUE_NAME,Boolean.TRUE,deliverCallback,cancelCallback );
手动应答
手动应答模式的好处就是,可以批量处理并减少网络拥堵。
/**
* deliveryTag:表示该详细的Key值
* multiple:表示是否批量化,默认为false
* requeue:被拒绝是否重新入队
*/
//表示肯定
channel.basicAck(deliveryTag,multiple);
//表示否定
channel.basicNack(deliveryTag,multiple,requeue);
channel.basicReject(deliveryTag,requeue);
//basicReject与basicAck相比较,basicReject只能处理一条信息,basicAck能处理多条信息。
自动应答
消息持久化 持久化概念/实现消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被 *** 作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
建议使用手动应答模式。
就是为防止RabbitMQ服务突然停掉,导致消息丢失,故我们要对消息进行持久化。具体就是对消息进行持久化、对队列进行持久化两步 *** 作。
- 队列持久化
所创建的队列在未声明的情况下,都是非持久化的。若要完成队列持久化,应在创建队列时,声明参数列表。
若前队列非持久化,后将其队列持久化,即QueueName,应将前队列删除,后重新创建队列。
//队列是否持久化
boolean persistence = true;
channel.queueDeclare(QUEUE_NAME,persistence,false,false,null);
- 消息持久化
实现消息持久化,在发送消息时,添加参数MessageProperties.PERSISTENT_TEXT_PLAIN,也就是消息属性,持久化BASIC。
//实现消息持久化,添加MessageProperties.PERSISTENT_BASIC参数,标记持久化
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
不公平分发
在RabbitMQ中的分发机制是轮询分发,也就是按顺序分发而已。所谓不公平分发就类似于设置权重值,将原有的公社平均分配机制,给为私有制,也就是能者多劳多得类似的概念,使用不公平分发机制,可能是因为消费者处理消息 的性能不同。
欲取值 :欲取值简单理解就是信道确认中通道上允许未确认消息的数量的最大数量,就是在不公平分发机制下对消费者所分发的消息最大数量。
在消费者中设置预期值,也就是实现不公平分发机制。
//通过信道设置参数basicQos传递参数,也就是欲取值。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
发布确认策略
发布确认原理
当消息和队列持久化开启,也就是讲消息写入底层磁盘中,将成功写入消息,也就是确认消息发布给消费者。即就是反确认机制。
//开启发布确认的方法
channel.confirmSelect();
发布确认策略
单个发送
一种简单的发布确认方式,一个消息在确认发布之后,排在后面的消息才能继续才能发布,也就是通过channel.waitForConfirms();
,该方法在消息被确认之后或者消息超出时间限制,会抛出异常 InterruptedException。
使用单消息发布确认策略,其短板就是发布速度较慢(发布一条消息并确认后,才能发布另一条消息),因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。
也就是在发布确认之后,其实就是返回设置,也就是确认消息是否发送成功。
//开启发布确认设置
channel.confimSelect();
//单消息确认发布策略——>!!!!!
channel.waitForConfirms();
批量发布
所谓批量确认发布,如同字面意思理解就是在批量发布消息后确认消息。批量发布相较于单个发送确认,其性能较高、提高吞吐量,但其缺点是倘若某个消息发送出现问题,批量发布不能细致确认是哪个消息发布出现的问题。
批量发布还是使用channel.waitForConfirms();
方法,只不过要在批量发送消息后使用该方法。
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,
他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,
下面就让我们来详细讲解异步确认是怎么实现的。
注意消息监听处理是异步的,也就是在消息发送后即使失败也不会立即处理;所以可以使用并行队列形式,就是在出现发送确认失败,重新发送。
相较于其它两种信息消息发送,异步发布处理有最佳性能和资源使用。
//发布确认开启
channel.confirmSelect();
//队列是否持久化
boolean persistence = true;
channel.queueDeclare(QUEUE_NAME,persistence,false,false,null);
//创建hash表,该表线程安全并有序
ConcurrentSkipListMap<Long,String> concurrentSkipListMap=new ConcurrentSkipListMap();
//创建消息确认失败和消息确认成功的回调函数
//消息发送成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
//消息发送成功回调-将存储在Map集合中的消息移除
//获取存储的Map集合,并移除其发送成功的消息
//若当前回调函数参数multiple,也就是清除多个还是单个
if (multiple){
//清除部分信息-也就是批量清除信息
//通过headMap方法返回小于当前deliveryTag的集合
ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
concurrentNavigableMap.clear();
}else {
//清除当前信息
concurrentSkipListMap.remove(deliveryTag);
}
}
//消息发送失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
//消息发送失败回调-将存储在Map集合中的消息重新发送
String message = concurrentSkipListMap.get(deliveryTag);
System.out.println("发布未确认的消息:"+message);
};
//使用消息监听器,监听消息消息是否发送成功,也就是确认消息是否要重发。
//消息监听器,要在消息发送之前声明。
//消息监听器,由信道创建
channel.addConfirmListener(ackCallback,nackCallback);
//发送并存储消息
concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
高级发布确认
当RabbitMQ出现宕机之后,导致消息的丢失。也就是生产者发送消息给交换机,或者由交换机分发消息给队列,若交换机或者队列为确认应答,则将其消息进行缓存。
生产者发送消息到Broker,Broker会向生产者传递一个ACK,根据这个ACK判断这个消息是否发送成功。
![RabbitMQ简介-高级发布确认.png](http://www.kaotop.com/file/tupian/20220425/b29194cb092fa97cd4772e803f3dee5d.png#clientId=ub656f365-6c02-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uf98b2989&margin=[object Object]&name=RabbitMQ简介-高级发布确认.png&originHeight=214&originWidth=843&originalType=binary&ratio=1&rotation=0&showTitle=false&size=43225&status=done&style=none&taskId=uc2175779-9da0-42c3-9bca-e6c2e8c085b&title=)
确认高级机制,就是监听消息到达交换机之间的过程,确认交换机是否接收到消息,接收到消息则将缓存中的消息删除。
- 添加配置文件和配置类
spring:
rabbitmq:
#ip地址
host: localhost
#端口号
port: 5672
#用户名即密码
username: root
password: root
#确认发布确认- 默认值none(也就是禁止发布确认)、correlated(也就是发布确认,也就是消息发送成功后,交换机触发回调方法)
publisher-confirm-type: correlated
@Configuration
public class RabbitMQConfig {
/**交换机名称*/
private static final String EXCHANGE_NAME = "Confirm_Exchange";
/**队列名称*/
private static final String QUEUE_NAME = "Confirm_Queue";
/**绑定的由路Key*/
private static final String ROUTING_KEY = "Confirm_RoutingKey";
/**声明交换机*/
@Bean
public DirectExchange ConfirmExchange (){
/*
* 参数列表:
* 1.交换机名称
* 2.是否持久化
* 3.是否自动删除
* 4.其它参数
* */
return new DirectExchange(EXCHANGE_NAME,false,false,null);
}
/**声明队列*/
@Bean
public Queue ConfirmQueue(){
/*
* 参数列表:
* 1.队列名称
* 2.是否持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
* */
return new Queue(QUEUE_NAME,false,false,false,null);
}
/**绑定交换机与队列*/
@Bean
public Binding queueBindingExchange (){
return BindingBuilder.bind(ConfirmQueue()).to(ConfirmExchange()).with(ROUTING_KEY);
}
}
- 回调函数类
出现消息宜异常时对消息的处理,也就是通过实现ConfirmCallBack接口,确认消息是否到达交换机,也就是确认消息是否被交换机处理,也就是对于队列出现消息接收不到的情况处理。
1. 实现ConfirmCallBack类
2. 实现confirm方法
3. 将是实现的方法类注入到RabbitTemplate类中
@Component
public class CallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**类注入,也就是指定当前的ConfirmCallBack*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 参数列表:
* 1. 消息的相关数据
* 2.交换机是否接收到消息
* 3.交换机接收消息失败的原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息的详细信息:"+correlationData.toString());
System.out.println("消息是否处理成功:"+b);
System.out.println("消息处理失败的原因:"+s);
}
}
- 生产者与消费者
@Component
public class Publisher {
private static final String EXCHANGE_NAME = "Confirm_Exchange";
private static final String ROUTING_KEY = "Confirm_RoutingKey";
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMessage")
@ResponseBody
public void sendMessage (){
/*
* 参数列表:
* 1.交换机名称
* 2.RoutingKey
* 3.消息体
* */
rabbitTemplate.convertAndSend(EXCHANGE_NAME,ROUTING_KEY,"发送的消息体-Confirm 发布确认高级");
}
}
@Component
public class Consumer {
/**队列名称*/
private static final String QUEUE_NAME = "Confirm_Queue";
@RabbitListener(queues = QUEUE_NAME)
public void newsConsumption (Message message){
System.out.println("接收的消息体:"+message.toString());
}
}
交换机将消息发送到队列,可能出现未知原因导致队列接收不到消息,从而导致消息丢失,但未通知消息的生产者;故为解决该问题,可以通过设置Mandatory参数,将队列是否成功接收到消息的状态返回到消息的生产者。
publisher-returns与 template.mandatory的区别就是消息是否会丢弃。
- 开启返回模式配置
spring:
rabbitmq:
#确认开启返回模式->消息未被队列接收时,直接返回
publisher-returns: true
#也是开启返回模式->消息未被队列接收时,直接返回或者直接丢弃
template:
mandatory: true
- 实现接口RabbitTemplate.ReturnsCallback
消息的生产者与消费者的编写与以上类似。
@Component
public class CallBack implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**类注入,也就是指定当前的ConfirmCallBack*/
@PostConstruct
public void init(){
//注入当前类
rabbitTemplate.setConfirmCallback(this);
//开启返回模式-> true 表示如果该消息无法进行由路时,将其该消息返回给消费者 | false
rabbitTemplate.setMandatory(true);
//设置消息退回之后,交由当前消息生产者处理
rabbitTemplate.setReturnsCallback(this);
}
/**
* 参数列表:
* 1. 消息的相关数据
* 2.交换机是否接收到消息
* 3.交换机接收消息失败的原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息的详细信息:"+correlationData.toString());
System.out.println("消息是否处理成功:"+b);
System.out.println("消息处理失败的原因:"+s);
}
/**
* 实现ReturnsCallback接口的returnedMessage方法
* */
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("返回模式,所返回的消息:"+returnedMessage.toString());
}
交换机备份
交换机备份,顾名思义解释将交换机备份,类似于 "影分身"的样子,当交换机收到一条不可由路的消息后,会将该消息交由备份的交换机处理,由该交换机进行处理转发和处理,通常该交换机的类型是Fanout。
也就是可以通过设置** alternate_exchange** 参数,实现交换机备份,也就是在声明交换机的时候同时告知其备份交换机。
使用备份交换机时,返回模式将失效,也就是说就算开启了返回模式,只要使用了备份交换机,其消息回交由备份交换机处理。
1. 通过Map集合设置备份交换机,并将Map集合传递到声明的交换机的其他参数中
1. 通过ExchangBuilder类,创建并声明交换机,且备注其备份交换机。
1.配置文件
@Configuration
public class RabbitMQConfig {
/**交换机名称*/
public static final String EXCHANGE_NAME = "Confirm_Exchange";
/**备份交换机名称*/
public static final String BACKUP_EXCHANGE_NAME = "Backup_Exchange";
/**队列名称*/
public static final String QUEUE_NAME = "Confirm_Queue";
/**备份交换机的队列*/
public static final String BACKUP_QUEUE_NAME = "Backup_Queue";
/**报警队列的名称*/
public static final String ALARM_QUEUE_NAME = "Alarm_Queue";
/**绑定的由路Key*/
public static final String ROUTING_KEY = "Confirm_RoutingKey";
/**声明交换机及其备份交换机*/
@Bean(EXCHANGE_NAME)
public DirectExchange ConfirmExchange (){
QueueBuilder.durable();
/*交换机的名称,是否持久化,设置alternate_exchange备份交换机,备份交换机的名称*/
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
/**声明队列*/
@Bean(QUEUE_NAME)
public Queue ConfirmQueue(){
/*
* 参数列表:
* 1.队列名称
* 2.是否持久化
* 3.是否共享
* 4.是否自动删除
* 5.其它参数
* */
return new Queue(QUEUE_NAME,true,false,false,null);
}
/**绑定交换机与队列*/
@Bean
public Binding queueBindingExchange (){
return BindingBuilder.bind(ConfirmQueue()).to(ConfirmExchange()).with(ROUTING_KEY);
}
/**声明备份交换机*/
@Bean(BACKUP_EXCHANGE_NAME)
public FanoutExchange BackupFanoutExchange (){
return new FanoutExchange(BACKUP_EXCHANGE_NAME,true,false,null);
}
/**声明备份队列*/
@Bean(BACKUP_QUEUE_NAME)
public Queue BackupQueue (){
return new Queue(BACKUP_QUEUE_NAME,true,false,false,null);
}
/**声明报警交换机*/
@Bean(ALARM_QUEUE_NAME)
public Queue AlarmQueue (){
return new Queue(ALARM_QUEUE_NAME,true,false,false,null);
}
/*
* 对于备份队列,其作用就将消息重新分发给消息费者,让其进行消费,报警队列应该更多的是打印其出现的问题。
* */
/**绑定备份队列与交换机/
public Binding BackupBinding (){
return BindingBuilder.bind(BackupQueue()).to(BackupFanoutExchange());
}
/**绑定报警队列与交换机*/
@Bean
public Binding AlarmBinding (){
return BindingBuilder.bind(AlarmQueue()).to(BackupFanoutExchange());
}
}
- 创建备份队列和报警队列
备份队列,也就是讲交换机中的消息重新分发给消息的消费者,让其处理。
报警队列,也就是提醒消息的生产者,有消息因为由路不通导致消息传输分发失败,自我理解也就是消息日志的打印。
- 交换机概念
生产者将消息,不会直接传递到队列中,而是通过交换机,将消息传输到队列中,再通过队列传递到消费者。交换机就是将消息分发传递到不同的队列中。
生产者在向交换机传递消息的同时会携带指定由路值 routingKey ,交换机通过指定由路值 routingKey 找到指定的队列,若该指定队列不存在,则会抛弃该消息。
- 临时队列
临时队列,顾名思义就是临时的,就是在创建并使用完成后,也就是在消费者断开与队列的连接之后,会被删除。也就是未经过持久化的队列。
//RabbitMQ为我们提供创建随机队列的方法
//通过信道创建队列
//返回的是随机队列名
String queueName = channel.queueDeclare().getQueue();
- 绑定(Binding)
也就是将交换机与创建的队列进行绑定。
Exchange 交换机类型 Direct Exchange(直接)、Fanout Exchange(扇出)、Topic Exchange(主题)、Headers Exchange(标题)、Default Exchange (默认交换机)。
这里的四种交换机类型对应着,RabbitMQ中的工作模式中的四种。
注:这里的默认交换机,在使用中通过空串 “” 表示。
Fanout Exchange Fanout类型,也就是发布订阅模式中;简单理解就是广播模式,通过广播将消息发送出去,发送给所有已知队列中。也就是所有已知绑定队列,消费者都可以接收,就是不管routingKey是否相同。
该省略很多的关键步骤。
//创建并获取信道
Channel channel = RabbitMQUtil.getChannel();
//创建交换机
/**
* 参数列表:
* 1.交换机名称
* 2.交换机类型
* */
String exchangeName = "Fanout Exchange";
channel.exchangeDeclare(exchangeName,"fanout");
//创建临时队列
/**
* 临时队列,在创建并传递完所有的消息后,挥别RabbitMQ删除
* */
String tempQueueName = channel.queueDeclare().getQueue();
String message = "发送消息的内容";
//发布消息
/**
* 参数列表:
* 1.交换机名称,默认交换机为空串
* 2.队列名称
* 3.其他信息
* 4.消息内容
* */
channel.basicPublish(exchangeName,tempQueueName,null,message.getBytes(StandardCharsets.UTF_8));
//获取信道
Channel channel = RabbitMQUtil.getChannel();
System.out.println("消息等待接收!!!");
//指定交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//随机队列名称
String queueName = channel.queueDeclare().getQueue();
System.out.println("消息队列:"+queueName);
//绑定交换机与指定
/**
* 参数列表:
* 1.队列名称
* 2.交换机名称
* 3.routingKey值
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
Direct Exchange
直接交换模式,与Fanout 模式(扇形模式/广播模式)类似,交换机与队列通过routingKey,传递消息。fanout模式是交换机绑定的所有队列都可以获取相同的消息,不管routingKey是否不同,而direct是交换机是通过routingKey与队列进行绑定,也就是队列从交换机获取消息,必须通过routingKey。
所交换机在传递消息时,根据routingKey找寻队列不存在,交换机会将消息直接丢弃。
多重绑定:若交换机与多个队列绑定,且routingKey相同,那么这种模式与Fanout 广播模式类似,由于RabbitMQ的默认分发机制为轮询,一个队列接收并处理一个消息。
主题模式,也就是在原有的基础上对routingKey支持通配符的效用,提高了应用场景。在消费端绑定多个routingKey。
在此处的通配符:#匹配一个或者是多个符号;* 匹配一个符号
省略~~~~
队列相关 死信队列 死信,由于其他原因导致消息无法被消费者进行消费。如果未配置死信队列,其死信消息会直接丢失,配置死信队列后,RabbitMQ会将死信消息丢进死信队列中,然后通过死信交换机,将死信信息重新分发给消费者。
消息TTL(Time To Live)也就是消息的存活时间。
死信机制 (DLX Dead Letter Exchange),本人理解就是避免消息的丢失,并将消息重新交由
死信的来源:
- 消息被否定确认(basicNack和basicReject,且为手动应答)。
- 消息处于队列的时间超过TTL时间。
- 消息队列中的消息数量超过消息队列的最大数量。
队列的最大消息数量:x-max-length
//为模拟死信消息
//设置极短的过期时间 TTL
//模拟消费者未应答
//手动设置队列中最大长度
//其中一项原因,就可称为死信消息
/**
* @author 居無何
* date: 2022/4/8 21:30
* Description: 生产者
*/
public class Producer {
private static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 死信模拟:
* 消息TTL过期
* 消息被否认
* 消息队列中的消息超过最大数
* */
//设置消息TTL-->添加 amqp-client 依赖
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//发送消息
for (int i = 0; i < 6; i++) {
/**
* 参数列表:
* 交换机名称
* routingKey
* 其他参数-可以设置过期时间TTL
* 消息体
* */
channel.basicPublish(NORMAL_EXCHANGE,"NormalLetterRoutingKey",properties,("发送的消息体:"+i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功!!!");
//关闭通道和连接
channel.close();
}
}
/**
* @author 居無何
* date: 2022/4/10 17:23
* Description: 正常队列
*/
public class NormalLetterConsumer {
/**
* 正常交换机
* */
private static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";
/**
* 死信交换机
* */
private static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明普通队列与死信队列的名称
String NormalQueueName = "NormalLetterQueue";
String DeadQueueName = "DeadLetterQueue";
//声明死信交换机与普通队列
/**
* 参数列表:
* 交换机名称
* 交换机类型 Fanout、Direct、Topic、Headers 四种模式
* */
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String, Object> params = new HashMap<String, Object>();
//也就是通过map集合设置配置信息类似的
//设置死信交换机,也就是在普通交换机中的消息出现死信后,将死信消息直接转发到死信交换机
params.put("x-dead-letter-exchange","DEAD_EXCHANGE");
params.put("x-dead-letter-routing-key","DeadLetterRoutingKey");
//声明死信队列与普通队列
/**
* 参数列表:
* 队列名称
* 是否持久书
* 是否共享
* 是否自动删除
* 其他参数
* */
channel.queueDeclare(DeadQueueName,false,false,false,null);
channel.queueDeclare(NormalQueueName,false,false,false,params);
//绑定交换机与队列
/**
* 参数列表:
* 队列名
* 交换机名
* routingKey
* */
channel.queueBind(DeadQueueName,DEAD_EXCHANGE,"DeadLetterRoutingKey");
channel.queueBind(NormalQueueName,NORMAL_EXCHANGE,"NormalLetterRoutingKey");
System.out.println("等待接收消息.....");
//回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Normal 接收到消息"+message);
// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("死信模拟-未被确认!!!!");
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
/**
* 参数列表
* 队列名
* 是否自动应答-手动应答false
* 成功回调
* 不成功回调
* */
channel.basicConsume(NormalQueueName, false, deliverCallback, cancelCallback);
}
}
/**
* @author 居無何
* date: 2022/4/8 21:31
* Description:
*/
public class DeadLetterConsumer {
/**
* 死信交换机
* */
private static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明普通队列与死信队列的名称
String DeadQueueName = "DeadLetterQueue";
//声明死信交换机
/**
* 参数列表:
* 交换机名称
* 交换机类型 Fanout、Direct、Topic、Headers 四种模式
* */
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明死信队列
channel.queueDeclare(DeadQueueName,false,false,false,null);
//绑定死信交换机与死信队列
channel.queueBind(DeadQueueName,DEAD_EXCHANGE,"DeadLetterRoutingKey");
System.out.println("等待消息的接收......");
//回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Dead 接收到消息"+message);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(DeadQueueName, true, deliverCallback, cancelCallback);
}
}
死信交换机绑定死信队列
正常交换机绑定正常队列
延迟队列,其队列内部是有序的,延迟队列顾名思义就是队列中的消息延迟处理,也是就是队列中的消息到达指定的时间后,进行处理。
应用场景:商品的订单,下单后在30分钟内进行支付等,若超出时间限制内,则取消订单。;邮件的跨时间发送。
TTL (Time To Live 生存时间),也就是队列中的消息存活的最大时间。其单位为毫秒制;若消息设置了TTL,或者消息队列设置TTL,当消息超过TTL,则消息会变成 “死信”,当消息变成死信,则通过 DLX ,死信消息发送给死信交换机,由死信交换机发送给死信队列进行重新消费。
若消息的TTL和队列的TTL同时设置,则会旋转TTL设置时间小的执行。
单消息设置TTL
//在消息发送的时候传递并设置TTL信息
AMQP.BasicProperties properties = new AMQP.BasicProperties().
builder().
expiration("10000").
build();
//将properties传递到参数中
channel.basicPublish(
NORMAL_EXCHANGE,
"NormalLetterRoutingKey",
properties,
("发送的消息体:"+i).getBytes(StandardCharsets.UTF_8)
);
队列设置TTL
//在队列声明是,传递参数 params
Map<String, Object> params = new HashMap<String, Object>();
//设置队列的TTL--->ttl是毫秒值
arguments.put("x-message-ttl", 10000);
//传递参数
channel.queueDeclare(NormalQueueName,false,false,false,params);
延迟队列优化
就是创建一个通用队列,不设置TTL,在消息生产者发送消息的时候,设置TTL消息。
延迟队列(死信问题) 在延迟队列中的消息过期,变成“死信”之后,其消息通过死信交换机,进入到死信队列中,在死信队列中的消息TTL不同,死信队列不会根据消息设置的TTL长短而重新发送消息,而是根据消息过期加入死信队列的顺序进行,消息的重新发送。
解决方法:使用延迟队列插件。
- 下载延迟队列插件
下载:https://www.rabbitmq.com/community-plugins.html
-
安装延迟队列插件
- 若是Windows版本
如果所使用的Windows版本,则将下载到的插件文件(后缀为.ez)放置到RabbitMQ的安装目录下的plugins文件下。
并通过命令栏安装,该插件。
**rabbitmq-plugins enable rabbitmq_delayed_message_exchange**
- 若是Linux系统
与Windows系统安装类似,将下载好后的插件,通过Xshell上传到Linux中的RabbitMQ安装目录的plugins文件。
执行命令安装插件
**rabbitmq-plugins enable rabbitmq_delayed_message_exchange**
然后重启RabbitMQ即可。
“Hello Word”模式就是简单模式,就是在生产者和消费者之间,RabbitMQ作为中间消息缓冲。
这里使用P代表生产者 Producer,使用C代表消费者 Consumer。
- 生产者
public class Producer {
/**队列名称*/
public final static String QUEUE_NAME = "Hello Word";
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接的相关信息
//连接IP地址
connectionFactory.setHost("192.168.137.1");
//连接用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//生成队列
//对于生成队列的参数列表,队列名、是否持久化、是否进行共享、是否自动删除、其他参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息声明
String message = "Hello Word";
//发送消息
//参数列表:指定交换机、发送到的队列(路由key)、其他参数、发送的消息体
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
//发送消息完毕
System.out.println("消息已发送");
//在此应做关闭通道处理-省略!!!
}
}
- 消费者
public class Consumer {
/**指定的队列名*/
public final static String QUEUE_NAME = "Hello Word";
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接的相关信息
//连接IP地址
connectionFactory.setHost("192.168.137.1");
//连接用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//消费实现,在此可以通过从DefaultConsumer类中的handleDelivery方法实现消息处理
DefaultConsumer consumer = new DefaultConsumer(channel);
//监听队列
//参数列表:队列名称、是否自动回复、消费方法
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Works queues
工作队列(任务队列),简单理解就是多个消费者对同一队列进行消费,多个消费者对应一个队列。也就是多个线程同时处理同一个消息队列中的消息。避免立即执行资源紧密型任务。
并且是消息队列中的消息是轮询分发的。
由于要启动多个线程,所以可以对的封装一个RabbitMQ工具类。
public class RabbitMQUtil {
public static ConnectionFactory getConnectionFactory (){
//创建链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接的相关信息
//连接IP地址
connectionFactory.setHost("192.168.137.1");
//连接用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
return connectionFactory;
}
/**
* 获取链接
* */
public static Connection getConnection () throws IOException, TimeoutException {
ConnectionFactory connectionFactory = RabbitMQUtil.getConnectionFactory();
//创建连接
Connection connection = connectionFactory.newConnection();
return connection;
}
/**
* 获取信道
* */
public static Channel getChannel () throws IOException, TimeoutException {
Connection connection = RabbitMQUtil.getConnection();
//创建信道
Channel channel = connection.createChannel();
return channel;
}
}
- 生产者
public class NewTask {
/**队列名称*/
public final static String QUEUE_NAME = "Hello Word";
public static void main(String[] args) throws IOException, TimeoutException {
//创建信道
Channel channel = RabbitMQUtil.getChannel();
//生成队列
//对于生成队列的参数列表,队列名、是否持久化、是否进行共享、是否自动删除、其他参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息声明
String message = "Hello Word !!!";
//发送消息
//参数列表:指定交换机、发送到的队列(路由key)、其他参数、发送的消息体
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//发送消息完毕
System.out.println("消息已发送");
//在此应做关闭通道处理
}
}
- 消费者
实际上是含有多个works。
public class Works {
/**指定的队列名*/
public final static String QUEUE_NAME = "Hello Word";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
System.out.println("等待消息接收");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息:"+message);
//耗时模拟
try {
doWork(message);
} finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//取消消息回调
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
System.out.println("消息传输中断");
});
}
}
这里先启动的是Work_2后启动的是Work_1,由于轮询机制,故如此。
发布/订阅,也就是一个生产者发送的消息,由交换机处理,发送给订阅该交换机的所有队列,即就是一条消息多个消费者处理。也就是消息群发模式。一个生产者,一个交换机,多个队列,多个消费者。
发布订阅的模式对应交换机的 Fanout 广播交换机。
public class publisher {
private static final String EXCHANGE_NAME = "Change_Name";
public static void main(String[] args) throws IOException, TimeoutException {
//创建并获取信道
Channel channel = RabbitMQUtil.getChannel();
//创建交换机
/**
* 参数列表:
* 1.交换机名称
* 2.交换机类型
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//创建临时队列
/**
* 临时队列,在创建并传递完所有的消息后,挥别RabbitMQ删除
* */
String tempQueueName = channel.queueDeclare().getQueue();
System.out.println("创建临时队列:"+tempQueueName);
String message = "发送消息的内容";
//发布消息
/**
* 参数列表:
* 1.交换机名称,默认交换机为空串
* 2.routingKey值
* 3.其他信息
* 4.消息内容
* */
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("消息发送成功!!!!");
}
}
public class subscriber {
private static final String EXCHANGE_NAME = "Change_Name";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
System.out.println("消息等待接收!!!");
//指定交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//随机队列名称
String queueName = channel.queueDeclare().getQueue();
System.out.println("消息队列:"+queueName);
//绑定交换机与指定
/**
* 参数列表:
* 1.队列名称
* 2.交换机名称
* 3.routingKey值
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
Routing
路由模式,就是交换机与队列之间通过routingKey进行绑定,也就是生产者在发送消息时,就指明routingKey,后交换机根据routingKey,将消息发送给绑定的队列,后消费者接收消息。
多重绑定,类似于广播模式。
- 通过routingKey将交换机与队列进行绑定
- 生产者在发送消息指定routingKey
- 交换机根据routingKey将消息传递给队列
在路由模式,就对应 Direct 交换机。
/**
* @author 居無何
* date: 2022/4/7 20:26
* Description: routing模式的生产者,routing模式的交换机类型为 Direct
*/
public class RoutingProducer {
private static final String EXCHANGE_NAME = "RoutingExchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建信道
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//队列名称声明
String queueName_1 = "RoutingConsumer_1";
String queueName_2 = "RoutingConsumer_2";
//routingKey
String routingKey_1 = "routingKey_1";
String routingKey_2 = "routingKey_2";
//声明队列
/**
* 参数列表:
* 1.队列名称
* 2.对否持久化
* 3.是否共享
* 4.知否自动应答
* 5.其他参数
* */
channel.queueDeclare(queueName_1,false,false,true,null);
channel.queueDeclare(queueName_2,false,false,true,null);
//交换机与队列进行绑定
/**
* 交换机与队列参数列表:
* 1.队列名
* 2.交换机名称
* 3.routingKey
* */
channel.queueBind(queueName_1,EXCHANGE_NAME,routingKey_1);
channel.queueBind(queueName_2,EXCHANGE_NAME,routingKey_2);
//发送消息
/**
* 参数列表:
* 1.交换机名称
* 2.routingKey值
* 3.其他参数
* 4.消息体
* */
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME,routingKey_1,null,("发送给RoutingConsumer_1 的消息体"+i).getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,routingKey_2,null,("发送给RoutingConsumer_2 的消息体"+i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功!!!!");
}
}
/**
* @author 居無何
* date: 2022/4/7 20:27
* Description: 消费者routingconsumer_1和routingconsumer_2只需更改routingkey值即可
*/
public class RoutingConsumer_1 {
private static final String EXCHANGE_NAME = "RoutingExchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("等待消息接收!!!");
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare("RoutingConsumer_1" ,false,false,true,null);
//声明交换机与队列的绑定
channel.queueBind("RoutingConsumer_1",EXCHANGE_NAME,"routingKey_1");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消息体: '" + message + "'");
};
//消息
channel.basicConsume("RoutingConsumer_1", true, deliverCallback, consumerTag -> { });
System.out.println("消息接收完毕!!!");
}
}
Topics
主题模式,就是在其以上的基础上增加了 routingKey 的通配符匹配,也就是只要routingKey符合匹配的规则,交换机可将其信息发送到绑定的队列中。
也就是增加通配匹配机制。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0g7VUNu2-1650705153826)(http://www.kaotop.com/file/tupian/20220425/1649345702451-4d0efb01-5b6f-4f3c-9efa-fd99c575f243.png)]
以下参照官网供图的代码实现。
/**
* @author 居無何
* date: 2022/4/7 23:22
* Description: Topic模式下的生产者
*/
public class Producer {
private static final String EXCHANGE_NAME = "Topic";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
/**
* 参数列表:
* 1.交换机名称
* 2.交换机类型
* */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列名--可省略
String Q1 = "Q1";
String Q2 = "Q2";
//声明队列
/**
* 参数列表:
* 1.队列名称
* 2.是否持久化
* 3.是否分享消息
* 4.是否自动应答
* 5.其他参数
* */
channel.queueDeclare(Q1,false,false,true,null);
channel.queueDeclare(Q2,false,false,true,null);
//绑定交换机与队列
/**
* 参数列表:
* 1.队列名
* 2.交换机名
* 3.routingKey值
* */
//由于使用的Topic模式,可直接在routingKey中使用通配符, *代表一个字符,#可以代表一个或者多个字符
channel.queueBind(Q1,EXCHANGE_NAME,"*.orange.*");
channel.queueBind(Q2,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(Q2,EXCHANGE_NAME,"lazy.#");
/**
* 参数列表:
* 1. 交换机名称
* 2.routingKey值
* 3.其他参数
* 4.发送的消息体
* */
//发送消息
for (int i = 0; i < 6; i++) {
channel.basicPublish(EXCHANGE_NAME,i+".orange."+i,null,("发送给Q1的消息体: routingKey="+i+".orange."+i).getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,i+"."+i+".rabbit",null,("发送给Q2的消息体: routingKey="+i+"."+i+".rabbit").getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"lazy."+i,null,("发送给Q2的消息体: routingKey="+"lazy."+i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送完毕!!!");
}
}
/**
* @author 居無何
* date: 2022/4/7 23:50
* Description: Topic 模式下的消费者Q2
*/
public class ConsumerQ1 {
private static final String EXCHANGE_NAME = "Topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("等待消息接收!!!");
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare("ConsumerQ1" ,false,false,true,null);
//声明交换机与队列的绑定
channel.queueBind("ConsumerQ1",EXCHANGE_NAME,"*.orange.*");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消息体: '" + message + "'");
};
//消息
channel.basicConsume("ConsumerQ1", true, deliverCallback, consumerTag -> { });
}
}
/**
* @author 居無何
* date: 2022/4/7 23:50
* Description: Topic模式下的消费者Q1
*/
public class ConsumerQ2 {
private static final String EXCHANGE_NAME = "Topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("等待消息接收!!!");
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare("ConsumerQ2" ,false,false,true,null);
//声明交换机与队列的绑定
channel.queueBind("ConsumerQ2",EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind("ConsumerQ2",EXCHANGE_NAME,"lazy.#");
//消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消息体: '" + message + "'");
};
//消息
channel.basicConsume("ConsumerQ2", true, deliverCallback, consumerTag -> { });
}
}
RabbitMQ 遇到问题
Not management user
我的解决方式是:重启RabbitMQ或者添加用户的标签为administrator
#添加用户
rabbitmqctl add_user username password
#列出所有用户
rabbitmqctl list_users
#修改用户角色
rabbitmqctl set_user_tags User Tag
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZL0oKHj6-1650705153827)(http://www.kaotop.com/file/tupian/20220425/1648353287219-25909806-7345-47be-bba6-737f791f4f00.png)]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)