RabbitMQ消息中间件文章目录
- 1.MQ引言
- 1.1 什么是MQ
- 1.2 MQ有哪些
- 1.3 不同MQ的特点
- 2.RabbitMQ的引言
- 2.1 RabbitMQ
- 2.2 RabbitMQ的安装
- 2.2.1 下载
- 2.2.2 安装
- 2.2.3 MQ的应用场景
- 3.RabbitMQ配置
- 4.点对点模型
- 4.1.生产者:发送消息
- 4.2.消费者:接收消息
- 5.工作队列work
- 5.1 工作队列平均分配
- 5.1.1 生产者
- 5.1.2 消费者1
- 5.1.3 消费者2
- 5.2 工作队列多劳多得
- 5.2.1 生产者
- 5.2.2 消费者1
- 5.2.3 消费者2
- 6.发布订阅模式fanout
- 6.1 生产者
- 6.2 消费者1
- 6.3 消费者2
- 7.路由模式direct
- 7.1 生产者
- 7.2 消费者1
- 7.3 消费者2
- 8.通配符模式topic
- 8.1 生产者
- 8.2 消费者1
- 8.3 消费者2
- 9. SpringBoot整合RabbitMQ
- 9.1 点对点模型
- 9.1.1 生产者
- 9.1.2 消费者
- 9.2 工作队列平均消费
- 9.2.1 生产者
- 9.2.2 消费者
- 9.3 发布/订阅模式
- 9.3.1 生产者
- 9.3.2 消费者
- 9.4 路由模式
- 9.4.1 生产者
- 9.4.2 消费者
- 9.5 通配符模式
- 9.5.1 生产者
- 9.5.2 消费者
- 10.RabbitMQ的集群
- 10.1 主从集群
- 10.2 镜像集群
- 10.2.1 查看当前策略
- 10.2.2 添加策略
- 10.2.3 删除策略
MQ----Message Queue消息队列,别名 消息中间件,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ有哪些 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ,RabbitMQ,阿里巴巴自主开发的RocketMQ,炙手可热的Kafka。
1.3 不同MQ的特点1.ActiveMQ
ActiveMQ是Apache出品的,最流行的,能力最强的开源消息总线,它是一个安全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小学企业颇受欢迎!
2.RabbitMQ ----追求数据一致性 数据可靠 Spring默认集成 不会丢失数据
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性,安全,AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
3.RocketMQ —阿里巴巴 开源的功能相对比较少,官方的版本功能不错
RocketMQ是阿里巴巴开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
4.Kafa —追求高吞吐量,抗非常高的并发,会丢失数据
Kafka是linkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复,丢失,错误没有严格的要求,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。2.RabbitMQ的引言 2.1 RabbitMQ
基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
SpringBoot默认集成RabbitMQ消息中间件,
RabbitMQ不丢失任何的数据
AMQP协议是在2003年时被提出的,最早用于解决金融领域不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wirelevel protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是之间定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
2.2 RabbitMQ的安装 2.2.1 下载Erlang下载地址(各版本都可下载)
http://www.erlang.org/downloads
我下载的是19.3,地址:http://erlang.org/download/otp_win64_19.3.exe
RabbitMQ下载地址:
Github仓库:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-3.7.7.exe
Bintray仓库:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7.exe
下载完如下图:
2.2.2 安装Erlang安装:
安装过程简单粗暴,以管理员身份运行,然后一直next即可
安装
完成
配置ERLANG_HOME:
配置Erlang的path:
RabbitMQ安装:
安装
完成
配置RABBITMQ_HOME:
配置RabbitMQ的path:
到此,RabbitMQ已经安装完毕了,打开【任务管理器】中的【服务】项,即可看到有一个【RabbitMQ】服务正在执行。
以管理员身份运行cmd.exe,进入目录D:Program Filesrabbitmq_server-3.7.9sbin(RabbitMQ Server安装目录),
运行cmd命令:rabbitmq-plugins.bat enable rabbitmq_management 激活管理插件
以管理员身份运行cmd.exe,运行命令:rabbitmq-service install 安装服务,运行 net stop RabbitMQ && net start RabbitMQ。启动RabbitMQ Server:
进入控制台:
地 址:http://localhost:15672/
用户名:guest
密 码:guest
2.2.3 MQ的应用场景1.异步处理
注册 —> 发送邮件 —>发送短信
串行方式:
并行方式:
消息队列:
2.应用解耦
场景:
缺点:当库存系统出现障碍时,订单就会失败。订单系统和库存系统高耦合,引入消息队列。
3.流量削峰
3.RabbitMQ配置访问RabbitMQ控制台:
http://localhost:15672/
添加RabbitMQ依赖:
4.点对点模型com.rabbitmq amqp-client5.7.2
一个生产者,对应一个消费者
在上图的模型中,有以下概念:
。P:生产者,也就是要发送消息的程序
。C: 消费者,消息的接收者,会一直等待消息的到来
。queue:消息队列,图中的红色部分,类似于一个邮箱,可以缓存消息,生产者想其中投递消息,消费者从其中
取出消息。
4.1.生产者:发送消息package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.tangguanlin.rabbitmq.Utils.RabbitMQUtils; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { //生产消息 public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",false ,false,false,null); channel.basicPublish("","hello", null,"hello rabbitmq".getBytes()); //6.释放资源 channel.close();; connection.close(); //RabbitMQUtils.closeConnectionAndChanel(channel,connection); } }4.2.消费者:接收消息
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); channel.queueDeclare("hello",false ,false,false,null); channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(new String(body)); } }); //6.释放资源 channel.close();; connection.close(); } }5.工作队列work
work queue,任务模型,当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型,让多个消费者绑定到一个队列同事消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
类似于1个CPU,多个打印机
角色: 一对多
。p:生产者 任务的发布者
。C1:消费者1 领取任务并且完成任务,假完成任务速度较慢
。C2:消费者2 领取任务并且完成任务,假设完成任务速度快
5.1 工作队列平均分配平均分配的缺点:
RabbitMQ给消费者1分配5个,消费者2分配5个,当消费者1执行完第3个时,宕机了,这是消费者1的第4个,第5个消息丢失,这是业务逻辑无法接受的。业务功能不希望失去任何消息。
5.1.1 生产者package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueProducer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); //5.发布消息 for(int i=1;i<=20;i++){ channel.basicPublish("","work",null,(i+"hello work queue").getBytes()); } //6.释放资源 channel.close(); connection.close(); } }5.1.2 消费者1
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; import java.util.concurrent.TimeoutException; import java.util.logging.Handler; public class WorkQueueCustomer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1"+new String(body)); try{ Thread.sleep(2000); //消费者1 处理速度更慢 }catch(Exception e){ e.printStackTrace(); } } }); channel.close();; connection.close();; } }5.1.3 消费者2
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueCustomer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2"+new String(body)); } }); channel.close();; connection.close();; } }5.2 工作队列多劳多得
多劳多得能解决平均分配模式的缺点。
消息放队列中,每次给通道1个消息,每次只能消费一个消费,取消自动确认。
//一次只接受一条未确认的消息 channel.basicQos(1);
//参数2:关闭自动确认消息 channel.basicConsume("work",false,new DefaultConsumer(channel){
//手动确认 参数1:手动确认消息标识 参数2: false 是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(),true);5.2.1 生产者
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueProducer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); //5.发布消息 for(int i=1;i<=20;i++){ channel.basicPublish("","work",null,(i+"hello work queue").getBytes()); } //6.释放资源 channel.close(); connection.close(); } }5.2.2 消费者1
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueCustomer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); //一次只接受一条未确认的消息 channel.basicQos(1); //参数2:关闭自动确认消息 channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ Thread.sleep(2000); }catch(Exception e){ e.printStackTrace(); } System.out.println("消费者1"+new String(body)); //手动确认 参数1:手动确认消息标识 参数2: false 是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(),true); } }); //channel.close();; //connection.close();; } }5.2.3 消费者2
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkQueueCustomer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通过通道绑定队列 channel.queueDeclare("work",true,false,false,null); //一次只接受一条未确认的消息 channel.basicQos(1); channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2"+new String(body)); //手动确认 参数1:手动确认消息标识 参数2: false 是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(),true); } }); //channel.close();; //connection.close();; } }6.发布订阅模式fanout
场景:用户登录成功后,既要发邮件,又要加积分,
用户登录成功后,发送一条消息给发邮件队列,再发送一条消息给加积分队列。
购物车-------下订单 订单系统
下订单--------库存要更新 库存系统
在开发中,用得比较多
在广播模式下,消息发送的流程是这样的:
。可以有多个消费者
。每个消费者有自己的队列queue
。每个队列都要绑定到交换机Exchange
。生产者发送的消息,只能发送到交换机,交换机来决定发送给哪个队列,生产者无法决定
。交换机把消息发送给绑定过的所有队列
。队列的消息都能拿到消息,实现一条消息被多个消费者消费
缺点:广播,每个队列接收的消息 都是整个交换机的消息,这个时候,其实分2个队列和1个队列效果是一样的,不能对消息进行筛选处理,不能部分消费。
6.1 生产者package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutProducer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); channel.exchangeDeclare("logs","fanout"); channel.basicPublish("logs","",null,"fanout type message".getBytes()); //6.释放资源 channel.close(); connection.close(); } }6.2 消费者1
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutCustomer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通道绑定交换机 channel.exchangeDeclare("logs","fanout"); //临时队列 String queueName = channel.queueDeclare().getQueue(); //5.绑定交换机和队列 channel.queueBind(queueName,"logs",""); //6.接收消息 channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close(); } }6.3 消费者2
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutCustomer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //Connection connection = RabbitMQUtils.getConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通道绑定交换机 channel.exchangeDeclare("logs","fanout"); //临时队列 String queueName = channel.queueDeclare().getQueue(); //5.绑定交换机和队列 channel.queueBind(queueName,"logs",""); //6.接收消息 channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close(); } }7.路由模式direct
可以对发布订阅模式中交换机里的消息进行筛选获取,部分处理。
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这是就要用到Direct类型的Exchange。
在Direct模型下:
。队列和交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
。消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
。Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey完全一致,才会接收到消息。
流程:
图解:
。P:生产者 向Exchange发送消息,发送消息时,会指定一个Routing Key
。X:Exchange(交换机) 接收生产者的消息,然后把消息递交给与Routing Key完全匹配的队列
。C1:消费者1 其所在队列指定了需要Routing Key为error的消息
。C2:消费者2 其所在队列指定了需要Routing Key为info、error、warning的消息
7.1 生产者package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DirectProducer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.绑定交换机 参数1:交换机名称 参数2:路由模式 channel.exchangeDeclare("logs_direct","direct"); //5.发送消息 指定 routingkey String routingKey = "error"; channel.basicPublish("logs_direct",routingKey,null,("这是direct模型发布的基于routing key:"+routingKey+"的消息").getBytes()); //6.释放资源 channel.close(); connection.close(); } }7.2 消费者1
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DirectCustomer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通道绑定交换机 channel.exchangeDeclare("logs_direct","direct"); //临时队列 String queueName = channel.queueDeclare().getQueue(); //5.绑定交换机和队列 channel.queueBind(queueName,"logs_direct","error"); //6.接收消息 channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close(); } }7.3 消费者2
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DirectCustomer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.通道绑定交换机 channel.exchangeDeclare("logs_direct","direct"); //临时队列 String queueName = channel.queueDeclare().getQueue(); //5.绑定交换机和队列 根据routing key匹配 队列 channel.queueBind(queueName,"logs_direct","error"); channel.queueBind(queueName,"logs_direct","info"); channel.queueBind(queueName,"logs_direct","warning"); //6.接收消息 channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close(); } }8.通配符模式topic
Topic类型的Exchange与Direct相比,都是可以根据Routing key把消息路由到不同的队列。只不过topic类型Exchange可以让队列绑定Routing key的时候使用通配符,这种模型都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert
.* 单个匹配
.# 无限匹配
流程:
8.1 生产者package com.tangguanlin.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TopicProducer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.绑定交换机 channel.exchangeDeclare("logs_topic","topic"); //5.发布消息 String routkey = "user2.save"; channel.basicPublish("logs_topic",routkey,null,"这里是topic通配符模式".getBytes()); //6.释放资源 channel.close(); connection.close(); } }8.2 消费者1
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import org.jdom2.output.StAXStreamOutputter; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TopicCustomer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.定义交换机 channel.exchangeDeclare("logs_topic","topic"); //5.绑定交换机 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"logs_topic","user1.*"); //6.接收消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close();//7. } }8.3 消费者2
package com.tangguanlin.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TopicCustomer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接mq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip connectionFactory.setPort(5672); //设置端口号 connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机 connectionFactory.setUsername("ems"); //访问虚拟主机的用户名 connectionFactory.setPassword("123"); //访问虚拟主机的密码 //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.获取连接中的通道 Channel channel = connection.createChannel(); //4.定义交换机 channel.exchangeDeclare("logs_topic","topic"); //5.绑定交换机 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"logs_topic","user2.*"); //6.接收消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2"+new String(body)); } }); //7.释放资源 //channel.close(); //connection.close(); } }9. SpringBoot整合RabbitMQ
springboot和rabbitMQ集成的包
org.springframework.boot spring-boot-starter-amqp
配置文件:application.yml
rabbitmq: host: 127.0.0.1 virtual-host: /ems port: 5672 username: ems password: 123
RabbitTemplate简化 *** 作
9.1 点对点模型 9.1.1 生产者package com.tangguanlin.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/p2p") public void point2pointProducer(){ rabbitTemplate.convertAndSend("p2p","hello world"); } }9.1.2 消费者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RestController; @Component @RabbitListener(queuesToDeclare = @Queue("p2p")) public class RabbitCustomer { @RabbitHandler public void receivePoint2point(String message){ System.out.println("message="+message); } }9.2 工作队列平均消费 9.2.1 生产者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/work") public void workProducer(){ rabbitTemplate.convertAndSend("work","work模型"); } }9.2.2 消费者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RestController; @Component public class RabbitCustomer { @RabbitListener(queuesToDeclare = @Queue("work")) public void receiveWork(String message){ System.out.println("work1 message = " + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void receiveWork2(String message){ System.out.println("work2 message = " + message); } }9.3 发布/订阅模式 9.3.1 生产者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/fanout") public void fanoutProducer(){ rabbitTemplate.convertAndSend("fanout","","发布/订阅模式发送的消息"); } }9.3.2 消费者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RestController; @Component public class RabbitCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时的队列 exchange = @Exchange(value = "fanout",type="fanout") //绑定的交换机 ) }) public void receiveFanout(String message){ System.out.println("fanout1 message ="+ message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时的队列 exchange = @Exchange(value = "fanout",type="fanout") //绑定的交换机 ) }) public void receiveFanout2(String message){ System.out.println("fanout2 message ="+ message); } }9.4 路由模式 9.4.1 生产者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/direct") public void directProducer(){ rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息"); } }9.4.2 消费者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RestController; @Component public class RabbitCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs",type="direct"), //自定义交换机名称和类型 key = {"info"} ) }) public void receiveDirect(String message){ System.out.println("direct1 message = "+ message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs",type="direct"), //自定义交换机名称和类型 key = {"error","warning"} ) }) public void receiveDirect2(String message){ System.out.println("direct2 message = "+ message); } }9.5 通配符模式 9.5.1 生产者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/topic") public void topicProducer(){ rabbitTemplate.convertAndSend("topics","user.insert","user.save路由信息"); } }9.5.2 消费者
package com.tangguanlin.controller; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RestController; @Component public class RabbitCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型 key ={"user.*"} ) }) public void receiveTopic(String message){ System.out.println("topic message = "+ message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型 key ={"user.save"} ) }) public void receiveTopic2(String message){ System.out.println("topic2 message = "+ message); } }10.RabbitMQ的集群 10.1 主从集群 10.2 镜像集群 10.2.1 查看当前策略
rabbitmqctl list_policies
10.2.2 添加策略rabbitmqctl set_policy ha-all “hello” ‘{“ha-mode”:“all”,“ha-sync-mode”:“automatic”}’
说明:策略正则表达式为 ^ 表示匹配所有队列名称 ^hello:匹配hello开头队列
10.2.3 删除策略rabbitmqctl clear_policy ha-all
r.save路由信息");
}
}
### 9.5.2 消费者
package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
@Component
public class RabbitCustomer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型 key ={"user.*"} ) }) public void receiveTopic(String message){ System.out.println("topic message = "+ message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型 key ={"user.save"} ) }) public void receiveTopic2(String message){ System.out.println("topic2 message = "+ message); }
}
# 10.RabbitMQ的集群 ## 10.1 主从集群 [外链图片转存中...(img-HBPlSjoh-1634474231305)] ## 10.2 镜像集群 [外链图片转存中...(img-im8DCxmZ-1634474231306)] ### 10.2.1 查看当前策略 rabbitmqctl list_policies ### 10.2.2 添加策略 rabbitmqctl set_policy ha-all "hello" '{"ha-mode":"all","ha-sync-mode":"automatic"}' 说明:策略正则表达式为 ^ 表示匹配所有队列名称 ^hello:匹配hello开头队列 ### 10.2.3 删除策略 rabbitmqctl clear_policy ha-all
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)