- RabbitMQ
- 前言
- 一、RabbitMQ是什么?
- 二、应用场景
- 1. 异步处理
- 2. 应用解耦
- 3. 削峰填谷
- 三、基础概念
- 1. 组件介绍
- 1.1 Broker
- 1.2 Virtual Host
- 1.3 Exchange
- 1.4 queue
- 1.5 Message
- 1.6 Connection
- 1.7 Channel
- 1.8 Binding
- 2. ExChange类型
- 2.1 Direct
- 2.2 Fanout
- 2.3 Topic
- 四、代码实现
- 1. 依赖引入
- 2. 文件配置
- 3. 绑定交换机和路由
- 4. 创建生产者
- 5. 创建消费者
前言
消息中间件是一种利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成。通过消息队列模型和消息传递机制,可以在分布式环境下进行扩展和通信。
一、RabbitMQ是什么?
RabbitMQ是一个基于elang开发的AMQP(即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。)开源的实现
二、应用场景 1. 异步处理场景说明:用户下单之后,后续会关联到库存系统以及物流系统的 *** 作,传统的做法就是使用串行的方式
串行:用户在订单系统中进行下单 *** 作,下单完成之后,会流转到库存系统,库存系统在会判断库存是否充足,是否可以下单成功。下单成功的情况大概需要102ms
消息中间件:用户在订单系统中进行下单 *** 作,下单完成之后到库存系统中进行核对,如果库存不足则直接返回下单失败,如果库存充足,则在进行库存减一之后将后续的 *** 作提交到消息队列中,先返回下单成功,后续 *** 作交由消息队列进行异步处理。此时不论成功还是失败消耗时间都是52ms左右,相比较串行为用户提前了约一半的时间返回下单结果,优化了用户的体验。
场景说明:用户下单之后,订单系统需要通知库存系统,传统的做法就是订单系统直接调用库存系统的接口
这个做法有一个缺点:
- 当库存系统出现故障的时候,订单就会失败
- 订单系统和库存系统高度耦合
引入消息队列
- 订单系统:用户下单之后,订单系统完成持久化处理,将消息写入到消息队列中,返回用户下单成功
- 库存系统:订阅下单的消息,获取下单消息进行库 *** 作。就算此时库存系统出现了故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(保证了订单不会丢失)
场景说明:在一些瞬时并发量比较大的场景下,例如秒杀活动,可能会出现瞬时请求量过大,造成数据库或者服务器的崩溃。所以可以在活动的前端加一个消息中间件,用来限制流量的流速
- 用户的请求,服务器接收到之后,先写入到消息队列中,若队列的长度超过最大值,则直接抛弃用户,给用户返回一个错误界面
- 保存下来的秒杀请求,进行正常处理
简单来说就是消息队列服务器实体,接受客户端连接,一个Broker里面可以由多个Virtual Host,用作不同用户的权限分离
1.2 Virtual Host每个RabbitMQ服务器都能够创建虚拟消息服务器,我们称之为虚拟主机。每个virtual host本质上是一个mini版的RabbitMQ服务器,拥有自己的交换机、队列、绑定等,拥有自己的权限机制。virtual host之于RabbitMQ就像虚拟机之于物理机的概念。他们通过在各个实例间提供逻辑上分离,允许为不同的应用程序安全保密的运行数据,这很有用,它既能将同一个RabbitMQ的众多客户区分开来。又可以避免队列和交换机的命令冲突。RabbitMQ提供了开箱即用的默认的虚拟主机“/”,如果不需要多个virtual host可以直接使用这个默认的virtual host,通过使用缺省的guest用户名和guest密码来访问默认的virtual host。
1.3 Exchange接收生产者发送的消息,并根据binding规则将消息路由给服务器中的队列,交换机常见有4种类型,direct,fanout,topic,headers。不同的交换机类型,路由的规则也是不同的。
1.4 queue消息队列,用来接收还未被消费者消费的信息
1.5 Message由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接收、优先级是多少等。而Body是真正需要传输的APP数据
1.6 Connection连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接
1.7 Channel信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要和Broker交互,如果每个线程都需要建立一个TCP连接,建立之后再销毁是十分浪费资源的。RabbitMQ建议客户端线程之间不要共用Channel,至少需要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
1.8 Binding路由,用于Exchange和Queue之间建立联系的
2. ExChange类型 2.1 Direct消息中的路由键(routing key)如果完全和Binding中的binding key完全一致,交换机就将消息发送到对应的队列上
举个例子:
可能出现的情况:
- 生产者发送到交换机中的消息的路由键是orange,则匹配orange路由规则,进入Q1消息队列等待消费者消费
- 生产者发送到交换机中的消息的路由键是black、green,则匹配到black或者green的路由规则,进入Q2消息队列等待消费
- 生产者发送到交换机中的消息的路由键既orange也不是black也不是green,那么该消息会被丢弃
从上图可以看出,这种策略,将忽略所谓的routing key,将消息分发到所有的绑定的Queue上,也即广播模式
Topic交换器则是通过模糊匹配的方式进行路由的,如果消息中的路由键满足routing key定义的匹配规则,则会进入到对应的队列中,Topic交换器中的消息和队列的关系,既可以是一对一,也可以是一对多。该类型的交换机中的binding key中的*表示匹配一个单词,#表示匹配0个或者多个单词
举个例子:
- Q1的binding key是*.orange.*,意味消息中的路由key必须是由三个单词组成的,并且中间的那个必须是orange,否则消息不会进入到该binding key对应的队列
- Q2的binding key是*.*.rabbit和lazy.#意味着消息中的路由key必须是三个单词组成的,并且最后一个单词是rabbit,或者是以lazy开头的单词
- 产生一个test.orange.mm消息,会被路由到Q1中;如果是test.orange则不会被路由到任何一个队列中
- 产生一个test.qq.rabbit或者lazy都可以分发到Q2中
- 如果产生一个test.orange.rabbit消息,则会被分发到Q1和Q2中
2. 文件配置org.springframework.boot spring-boot-starter-amqp2.5.3
spring: rabbitmq: host: 82.157.60.144 port: 5672 username: guest password: guest listener: simple: concurrency: 2 max-concurrency: 2 publisher-/confirm/is: true virtual-host: /3. 绑定交换机和路由
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { // 队列 @Bean public Queue TestDirectQueue(){ // durable:是否持久化,默认是false,持久化的队列:会被存储到磁盘上,当消息代理重启时任然存在 暂存队列:只有当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭之后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会被自动删除 return new Queue("TestDirectQueue",true,true,false); } // 交换机 @Bean DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false); } // 绑定 @Bean Binding TestBinding(){ return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }4. 创建生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController @RequestMapping("rabbit") public class TestRabbitMQSend { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("sendSOS") public String sendSOS(){ String value = String.valueOf(UUID.randomUUID()); String msg = "this is a new message"; String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map5. 创建消费者map = new HashMap<>(); map.put("messageId",value); map.put("message",msg); map.put("time",format); //将消息携带绑定值发送到交换机中 rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting",map); return "ok"; } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString()); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)