目录
RabbitMQ简介:
什么是MQ:
应用场景:
MQ不足:
什么时候使用MQ?
MQ主要解决哪些层面问题:
RabbitMQ快速入门:
下载安装:
配置环境变量:
启动管理工具:
RabbitMQ添加环境:
学习五种队列
RabbitMQ简介: 什么是MQ:
定义:消息队列
什么是队列:队列即为一个消息管道,以管道的形式去做消息传递,遵循先入先出的规则
消息队列也支持多线程
什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
市面上常见的消息队列开源组件有哪些?
ActiveMQ:是老牌的消息中间件,国内很多公司过去运用的还是非常广泛,功能很强大。但问题在于没法确认ActiveMQ可以支撑互联网公司的高并发,高负载以及高吞吐的复杂场景,在国内的互联网公司落地的较少。
RabbitMQ:可以支撑互联网公司的高并发,高负载以及高吞吐的复杂场景,同样有非常完善便捷的后台管理界面可以使用。另外他还支持集群化,高可用部署架构,消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用的RabbitMQ的实践也比较多。
RockMQ:是阿里开源,经过阿里的生产环境的超高并发,高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景,而且RocketMQ是基于Java语言开发,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造
Kafka:提供的消息中间件的功能明显较少一些,相对上述巨款MQ中间件要少很多。但是Kafka的优势在于专为超高吞吐量的实时日志采集,实时数据同步,实时数据计算等场景来设计。因此Kafka在大数据领域中配合实时计算技术(比如spark streaming,storm,flink)使用的较多。
应用场景:1.在电商项目,例如双11秒杀抢购商品,结算时,界面会提醒我们稍等一下
MQ不足:- 系统更复杂,多了一个MQ组件
- 消息传递路径更长,延时会增加
- 消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证
- 上游无法知道下游的执行结果,这一点是很致命的
调用方实时依赖执行结果的业务场景,请使用调用,而不是MQ。
什么时候使用MQ?- 数据驱动的任务依赖
- 上有不关心下游执行结果
- 异步返回执行时间长
为什么会产生消息队列?有什么原因?
不同进程(process)之间传递信息时,两个进程之间耦合程度过高,改动一个进程,
MQ主要解决哪些层面问题:- 应用解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。
- 异步请求。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。
- 流量削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:
- 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
- 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
- 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
- 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
- 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com
下载安装:RabbitMQ由Erlang语言开发,需要安装与RabbitMQ版本对应的Erlang语言环境。RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html 配置环境变量:
- 在环境变量中新增ERLANG——HOME,路径为erl的路径,不包含bin
- 在Path变量里新增%ERLANG——HOME%\bin
- 打开cmd输入erl查看是否成功
1.双击 RabbitMQ Command Prompt(sbin dir)
2.输入命令:rabbitmq-plugins enable rabbitmq_management
这样就启动管理工具了,可以试一下命令:
停止:net stop RabbitMQ 等价于 RabbitMQ Service -stop
启动:net start RabbitMQ 等价于 RabbitMQ Service -start
3.在浏览器输入地址查看:http://127.0.0.1:15672/
4.输入账号名和密码:guest
RabbitMQ的工作原理
下图是RabbitMQ的基本结构:
组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
RabbitMQ添加环境:添加用在Admin界面中添加user
用户角色
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行 *** 作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建Virtual Hosts
选中Admin用户,设置权限:
看到权限已加:
4.4.管理界面中的功能
学习五种队列消息消费方 》 MQ 》 消息生产方
实例(生产方提供消息发送至中间件)上游发送消息到中间件:
config.properties
rabbitmq-server-address=localhost
rabbitmq-server-port=5672
rabbitmq-server-virtualHost=testUser
rabbitmq-server-vm=q_test2022425
rabbitmq-server-username=admin
rabbitmq-server-password=admin
private static String SERVER_ADDRESS;
private static Integer SERVER_PORT;
private static String SERVER_VIRTUAL_HOST;
private static String SERVER_VM;
private static String SERVER_USERNAME;
private static String SERVER_PASSWORD;
static {
try(InputStream inputStream = MessageOperator.class.getResourceAsStream("/config.properties");) {
Properties properties = new Properties();
properties.load(inputStream);
SERVER_ADDRESS = properties.getProperty("rabbitmq-server-address");
SERVER_PORT = Integer.parseInt(properties.getProperty("rabbitmq-server-port"));
SERVER_VIRTUAL_HOST = properties.getProperty("rabbitmq-server-virtualHost");
SERVER_VM = properties.getProperty("rabbitmq-server-vm");
SERVER_USERNAME = properties.getProperty("rabbitmq-server-username");
SERVER_PASSWORD = properties.getProperty("rabbitmq-server-password");
}catch (Exception e){
e.printStackTrace();
}
}
public void sendToMQ(String message){
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
Channel channel = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
channel = newConnection.createChannel();
//7.创建队列
channel.queueDeclare(SERVER_VM,false,false,false,null);
//8.将消息发送至RabbitMQ
channel.basicPublish("",SERVER_VM,null,message.getBytes());
if (channel.waitForConfirms()){
System.out.println("发送成功!");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
newConnection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
实例(消费方从中间件接收消息)下游接收消息从中间件获取上游消息:
private static String SERVER_ADDRESS;
private static Integer SERVER_PORT;
private static String SERVER_VIRTUAL_HOST;
private static String SERVER_VM;
private static String SERVER_USERNAME;
private static String SERVER_PASSWORD;
static {
try(InputStream inputStream = MessageOperator.class.getResourceAsStream("/config.properties");) {
Properties properties = new Properties();
properties.load(inputStream);
SERVER_ADDRESS = properties.getProperty("rabbitmq-server-address");
SERVER_PORT = Integer.parseInt(properties.getProperty("rabbitmq-server-port"));
SERVER_VIRTUAL_HOST = properties.getProperty("rabbitmq-server-virtualHost");
SERVER_VM = properties.getProperty("rabbitmq-server-vm");
SERVER_USERNAME = properties.getProperty("rabbitmq-server-username");
SERVER_PASSWORD = properties.getProperty("rabbitmq-server-password");
}catch (Exception e){
e.printStackTrace();
}
}
public void receiveToMQ(){
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
Channel channel = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
channel = newConnection.createChannel();
//7.创建队列
channel.queueDeclare(SERVER_VM,false,false,false,null);
//8.将从RabbitMQ接收消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(SERVER_VM,true,queueingConsumer);
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
System.out.println("下游接收模块已接受上游信息:"+new String(delivery.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
channel.close();
newConnection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
加锁:
(生产方):
public void sendToMQ(String message){
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
Channel channel = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
channel = newConnection.createChannel();
//7.创建队列
//b持久化 b1独占一个队列 b2在空闲时候自动删除
channel.confirmSelect();
channel.queueDeclare(SERVER_VM,true,false,true,null);
//8.将消息发送至RabbitMQ
channel.basicPublish("",SERVER_VM,null,message.getBytes());
if (channel.waitForConfirms()){
System.out.println("发送成功!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
(消费方):
public void receiveToMQ(){
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
Channel channel = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
channel = newConnection.createChannel();
//7.创建队列
channel.queueDeclare(SERVER_VM,true,false,true,null);
//8.将从RabbitMQ接收消息
//回调
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = "下游接收模块已接受上游信息:"+new String(delivery.getBody());
System.out.println("[x] Received"+message);
};
//参数2设置为true代表关闭自动ack机制(MQ自动删除数据)
channel.basicConsume(SERVER_VM,true,deliverCallback,consumerTag -> {});
} catch (Exception e) {
e.printStackTrace();
}
}
ack(使用5版本的代码):
(生产方):
public void sendByMQ(){
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
Channel channel = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
channel = newConnection.createChannel();
//7.创建队列
//b持久化 b1独占一个队列 b2在空闲时候自动删除
channel.queueDeclare(SERVER_VM,true,false,true,null);
channel.basicQos(1);//通道索引为1
for (int i = 0; i < 30; i++) {
String msg = "【美团外卖】订单编号:"+(i+1)+",正在提交订单";
System.out.println(msg);
channel.basicPublish("",SERVER_VM,null,msg.getBytes());
}
}catch (Exception e){
e.printStackTrace();
}
}
(消费方):
public void getMsg() {
//1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2.设置 *** 作的虚拟机
factory.setVirtualHost(SERVER_VIRTUAL_HOST);
//3.设置 *** 作MQ服务器的主机以及端口号
factory.setHost(SERVER_ADDRESS);
factory.setPort(SERVER_PORT);
//4.设置账号和密码
factory.setUsername(SERVER_USERNAME);
factory.setPassword(SERVER_PASSWORD);
Connection newConnection = null;
try {
//5.创建通信连接(创建时可能会出现io异常)
newConnection = factory.newConnection();
//6.创建通道
final Channel channel = newConnection.createChannel();
//7.创建队列
channel.queueDeclare(SERVER_VM, true, false, true, null);
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String msg = new String(delivery.getBody(),"UTF-8");
try {
Thread.sleep(800);
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("下游接收成功!");
System.out.println("Received :"+msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(SERVER_VM,false,deliverCallback,consumerTag -> {});
} catch (Exception e) {
e.printStackTrace();
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)