在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
常见MQ的类型4、不应使用MQ的场景:调用方实时依赖执行结果(上游需要关注执行结果)的业务场景,也就是上游实时关注执行结果
功能特性异步、解耦、流量削峰
- 是一个独立运行的服务。生产者发送消息,消费者接受消息,需要先跟服务器建立连接;
- 采用消息队列作为数据结构,有先进先出的特点;
- 具有发布订阅(publish/subscribe)的模型,消费者可以获取自己需要的消息。
- 运维成本高
- 系统可用性降低
- 系统复杂性提高
- 数据驱动的任务依赖
- 上游不关心多下游执行结果
- 异步返回执行时间长
AMQP(Advanced Message Queuing Protocol)
Server又称Broker,接收客户端连接,实现AMQP实体服务
Connection:连接,应用程序与Broker的网络连接
Channel网络信道,几乎所有的 *** 作都是在这上面进行,是消息读写的通道,客户端可以建立多个,每个代表一个会话。
Message消息,服务器与应用程序之间传送数据的载体,由Properties和Body组成。Properties可以对消息进行修饰,比如优先级、延迟等高级特性,Body是消息体内容。
Vhost虚拟主机,用于逻辑隔离,最上层的消息路由。可以有多个,一般由项目模块划分
Exchange BindingExchange与Queue之间的虚拟连接,binding中可以包含routing key
RoutingKey一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息
QueueMessage Queue,消息队列,保存消息并将它们转发给消费者
JMS认识-
MQ实现参照了jms规范,(规范就是一种约定)该规范中包括
-
提供者:实现jms规范的中间件服务器
-
客户端:发送或者接受消息的应用程序
-
生产者/发布者:创建并发送消息的客户端
-
消费者/订阅者:接受并处理消息的客户端
-
消息:应用程序之间传递的内容
-
消息模式:在客户端之间传递消息的方式,jms中定义了主题和队列两种模式
主题模式:假如发布者发布了100条消息,那么如果有n个订阅者,每个订阅者都可以获取到100条消息。
队列模式:假如生产者发送了100条消息,如果有n个消费者,那么每个消费者加起来获取到的消息总数是100,一个消息只能被一个消费者消费。
-
优点
解耦:上下游逻辑+物理解耦,除了与MQ有物理连接,模块之间都不相互依赖;将消息写入消息队列,新增一个下游消息关注方,上游不需要修改任何代码
异步:上游执行时间短,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
-
缺点
系统可用性降低:系统更复杂,多了一个MQ组件
消息传递路径更长,延时会增加
消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证
上游无法知道下游的执行结果,这一点是很致命的
-
什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
- Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑
- ConnectionFactory为Connection的制造工厂
- Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务 *** 作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
- Message acknowledgment:消息确认,消费者再消费完一条消息后会发送一个回执确认已经消费的信息到MQ,MQ则将该消息从队列中移除
- Message durability:消息持久化,到MQ服务宕机时保证未被消费的消息不丢失。
- Prefetch count:预取计数,消息一条一条推送,消费完一条再推送一条。
- Exchange:交换器,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
- routing key:路由选择键,在发送消息给Exchange时,通过指定routing key来决定消息流向哪里,与Exchange Type及binding key联合使用才能最终生效
- Binding:RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了
- …
http://tryrabbitmq.com/ 模拟画图工具
Direct直连交换机,使用明确的绑定键,适用于业务明确的场景。一个队列与直连类型交换机绑定,需要指定一个明确的绑定键(binding key),生产者发送消息时会携带一个路由键(routing key)。当消息的路由键与某个队列的绑定键完全匹配时,消息才会从交换机路由到这个队列上,多个队列也可以使用相同的绑定键。消费者可以从队列中获取消息进行消费。
示例代码(原生) pom引入依赖
消费者com.rabbitmq amqp-client5.6.0
import com.rabbitmq.client.*; import java.io.IOException; public class MyConsumer { // 交换机名称 public final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE"; // 队列名称 public final static String QUEUE_NAME = "SIMPLE_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 连接IP factory.setHost("8.134.132.250"); // 默认监听端口 factory.setPort(5672); // 虚拟机 factory.setVirtualHost("/"); // 设置访问的用户 factory.setUsername("guest"); factory.setPassword("guest"); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); // 声明交换机 // String exchange, String type, boolean durable, boolean autoDelete, Map生产者arguments channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null); // 声明队列 // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" Waiting for message...."); // 绑定队列和交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "tiger.best"); // 创建消费者(依据信道channel来创建 ) Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); System.out.println("consumerTag : " + consumerTag); System.out.println("deliveryTag : " + envelope.getDeliveryTag()); } }; // 开始从队列中获取消息 // String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, true, consumer); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MyProducer { // 交换机名称 private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE"; public static void main(String[] args) throws Exception { // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); // 连接IP factory.setHost("8.134.132.250"); // 连接端口 factory.setPort(5672); // 虚拟机 factory.setVirtualHost("/"); // 用户 factory.setUsername("guest"); factory.setPassword("guest"); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); // 发送消息 String msg = "Hello world, Rabbit MQ!!!"; // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE_NAME, "tiger.best", null, msg.getBytes()); // 关闭,释放资源 channel.close(); conn.close(); } }代码示例(死信队列)
实订单延时关闭,消息过期属性(x-message-ttl),通过设置这个属性,超过指定时间后,消息会被丢弃;通过
消费者package com.tiger.dlx; import com.rabbitmq.client.*; import com.tiger.util.ResourceUtil; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DlxConsumer { // 死信交换机 public final static String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE"; // 死信队列 public final static String DEAD_LETTER_QUEUE = "DEAD_LETTER_QUEUE"; // 正常队列 public final static String ORI_USE_QUEUE = "ORI_USE_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); // 指定队列的死信交换机 Map生产者arguments = new HashMap (); arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 声明队列(默认交换机AMQP default,Direct),并指定死信交换机 channel.queueDeclare(ORI_USE_QUEUE, false, false, false, arguments); // 声明死信交换机 channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "topic", false, false, false, null); // 声明死信队列 channel.queueDeclare(DEAD_LETTER_QUEUE, false, false, false, null); // 绑定死信交换机和死信队列,设置为 #,无条件接收 channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "#"); System.out.println(" Waiting for message...."); // 创建消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); System.out.println("consumerTag : " + consumerTag); System.out.println("deliveryTag : " + envelope.getDeliveryTag()); } }; // 监听正常队列 // channel.basicConsume(ORI_USE_QUEUE, true, consumer); // 开始获取消息,监听死信队列 channel.basicConsume(DEAD_LETTER_QUEUE, true, consumer); } }
package com.tiger.dlx; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.tiger.util.ResourceUtil; public class DlxProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ, DLX MSG"; // 设置属性,消息10秒钟过期 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() // 持久化消息 .deliveryMode(2) .contentEncoding("UTF-8") // TTL 10秒过期 .expiration("10000") .build(); // 发送消息,默认交换机 channel.basicPublish("", DlxConsumer.ORI_USE_QUEUE, properties, msg.getBytes()); channel.close(); conn.close(); } }消费端限流
channel.basicQos(prefetchCount);
import com.rabbitmq.client.*; import com.tiger.util.ResourceUtil; import java.io.IOException; import java.util.concurrent.TimeUnit; public class LimitConsumerSlow { private final static String QUEUE_NAME = "TEST_LIMIT_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 final Channel channel = conn.createChannel(); // 声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("LimitConsumerSlow Waiting for message...."); // 创建消费者,并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); try { // 睡眠5秒钟 TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("LimitConsumerSlow Received message : '" + msg + "'"); channel.basicAck(envelope.getDeliveryTag(), true); } }; // 非自动确认消息的前提下, // 如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前, // 不进行消费新的消息。因为LimitConsumerSlow的处理速率很慢, // 收到两条消息后都没有发送ACK,队列不会再发送消息给Consumer2 channel.basicQos(2); channel.basicConsume(QUEUE_NAME, false, consumer); } }Topic
主题交换机,一个队列与主题类型交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:
#代表0个或多个单词
*代表不多不少一个单词
单词指的是用英文的点“.”隔开的字符。例如 hello.tiger.yes 是3个单词
适用于一些根据业务主题或消息登记过滤的场景,比如或一条消息可能和资金有关,又和风控有关,那么就可以设置一个多级路由键。第一个单词和资金有关,第二个单词和风控有关。
Fanout广播交换机,队列与交换机不需要绑定键,因此生产者发送消息到交换机也不用携带路由键,当消息到达交换机时,所有与之绑定的队列都会收到相同的消息副本。
消息可靠投递 事务模式import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.tiger.util.ResourceUtil; public class TransactionProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ"; // 声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { // 开启事务模式 channel.txSelect(); // 发送消息,发布了4条,但只确认了3条 channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // 提交事务 channel.txCommit(); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // 模拟异常,消息投递不成功 int i = 1 / 0; channel.txCommit(); System.out.println("消息发送成功"); } catch (Exception e) { // 消息回滚 channel.txRollback(); System.out.println("消息已经回滚"); } channel.close(); conn.close(); } }确认模式 Confirm 普通确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.tiger.util.ResourceUtil; public class Normal/confirm/iProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ ,Normal /confirm/i"; // 声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启发送方确认模式 channel./confirm/iSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // 普通/confirm/i,发送一条,确认一条 if (channel.waitFor/confirm/is()) { System.out.println("消息发送成功"); } else { System.out.println("消息发送失败"); } channel.close(); conn.close(); } }批量确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.tiger.util.ResourceUtil; public class Batch/confirm/iProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ ,Batch /confirm/i"; // 声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { channel./confirm/iSelect(); for (int i = 0; i < 5; i++) { // 发送消息 channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); } // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了 // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到) // 直到所有信息都发布,只要有一个未被Broker确认就会IOException channel.waitFor/confirm/isOrDie(); System.out.println("消息发送完毕,批量确认成功"); } catch (Exception e) { // 发生异常,可能需要对所有消息进行重发 // TODO e.printStackTrace(); } channel.close(); conn.close(); } }异步确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.tiger.util.ResourceUtil; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class Async/confirm/iProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ, Async /confirm/i"; // 声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 用来维护未确认消息的 deliveryTag final SortedSet代码实战案例confirmSet = Collections.synchronizedSortedSet(new TreeSet ()); // 异步监听确认和未确认的消息 // 这里不会打印所有响应的ACK:ACK可能有多个,有可能一次确认多条,也有可能一次确认一条 // 如果要重复运行,先停掉之前的生产者,清空队列 channel.add/confirm/iListener(new /confirm/iListener() { public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未确认消息,标识:" + deliveryTag); if (multiple) { // headSet表示后面参数之前的所有元素,全部删除 /confirm/iSet.headSet(deliveryTag + 1L).clear(); } else { /confirm/iSet.remove(deliveryTag); } // 这里添加重发的方法 } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认 System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple)); if (multiple) { // headSet表示后面参数之前的所有元素,全部删除 /confirm/iSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一个元素 /confirm/iSet.remove(deliveryTag); } System.out.println("未确认的消息:" + /confirm/iSet); } }); // 开启发送方确认模式 channel./confirm/iSelect(); for (int i = 0; i < 10; i++) { long nextSeqNo = channel.getNextPublishSeqNo(); // 发送消息 channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); /confirm/iSet.add(nextSeqNo); } System.out.println("所有消息:" + /confirm/iSet); // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK //channel.close(); //conn.close(); } }
https://gitee.com/tiger2050/springboot-mq-demo
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)