MQ全称为Message Queue,即消息队列;RabbitMQ (Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列;
常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,metaMQ,RocketMQ、Redis(也可做消息队列)
1.1 消息队列“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。
优势:
1)解耦:系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可
2)异步:如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能
3)削峰:如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
1.2 AMQP 是什么AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计。其协议主要分成两层:
功能层(Functional Layer):定义一组使用命令
传输层(Transport Layer):基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时可以处理信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。
这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,基于此实现不同的上层协议。
主要组件:
AMQ 作为中间层服务,把消息生产和消费分隔开来,当消息生产者出现异常,不影响消费者对消息的消费,当消费者异常时,生产者生产的消息可以存放到服务的内存或者磁盘,不会影响到消费的速率,同时,消息也可以基于路由的规则可以投递到指定的消费者消费。
交换机类型:
1)直连型
直连交换机是根据Routing key来将消息路由到具体的queue;
Routing key=booking 同时绑定了Queue1和Queue2,所以当生产者发送消息P到直连交换机,指定的RoutingKey为booking时,直连交换机发现能匹配到与之绑定的有Queue1和Queue2,这时候交换机会将这个消息P转发到Queue1和Queue2上。
Routing key=create和Routing key=/confirm/i只与Queue2做了绑定,所以当生产者发送消息P到直连交换机,指定的RoutingKey为create或者/confirm/i时,直连交换机只能匹配到与之绑定的Queue2,所以这个消息只会被Queue2消费到;
2)扇形
生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。
3)主题交换机
当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。
4)头交换机
2 RabbitMQ 2.1 安装headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
2.2 组件介绍docker pull rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbmitq 容器ID
地址:IP+默认端口15672
默认账号、密码:guest
Producter : 消息生产者,负责将消息发送到MQ
Consumer : 消息消费者,接收MQ转发的消息
Connection : 连接通道,包含信道channel
Broker : 消息队列服务进程,包含Exchange和Queue
Exchange : 消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过滤
Queue : 消息队列;存储消息的队列,消息到达队列并转发给指定的消费方
消息发布流程:
2.3 编码实现发布消息 :
1. 生产者和Broker建立TCP连接
2. 生产者和Broker建立通道
3. 生产者通过通道将消息发送给Broker,由Exchange将消息进行转发
4. Exchange将消息转发到指定Queue
接收消息 :
1.消费者和Broker建立TCP连接
2.消费者和Broker建立通道
3.消费者监听指定的Queue
4.当有消息到达Queue时Broker默认将消息推送给消费者。
5.消费者接收到消息。
队列持久化:在声明队列的时候,将第二个参数设置完true,则可以完成队列持久化的设置,队列的信息会保存到磁盘中。当rabbitMQ重启,则会恢复之前的存在的队列。
channel.queueDeclare(QUEUE,true,false,false,null);
消息持久化:在生产者发送消息时,可以通过第三个参数设置消息属性,将消息声明为持久化。则可以完成将消息写入磁盘。当rabbitmq重启,在队列恢复的同时也会一并恢复该队列中设置持久化属性且未被消费的消息。
String message = "hello RabbitMQ"; channel.basicPublish("",QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
以下内容转自:超详细的RabbitMQ入门,看这篇就够了!-阿里云开发者社区
org.springframework.boot spring-boot-starter-amqp
公共配置类:
application.yml文件加上RabbitMQ的配置信息:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
生产者:
@Configuration public class DirectRabbitConfig { @Bean public Queue rabbitmqDemoDirectQueue() { return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false); } @Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交换机 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false); } @Bean public Binding bindDirect() { //链式写法,绑定交换机和队列,并设置匹配键 return BindingBuilder //绑定队列 .bind(rabbitmqDemoDirectQueue()) //到交换机 .to(rabbitmqDemoDirectExchange()) //并设置匹配键 .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING); } }
发送消息的Service类:
@Service public class RabbitMQServiceImpl implements RabbitMQService { //日期格式化 private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Resource private RabbitTemplate rabbitTemplate; @Override public String sendMsg(String msg) throws Exception { try { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); String sendTime = sdf.format(new Date()); Mapmap = new HashMap<>(); map.put("msgId", msgId); map.put("sendTime", sendTime); map.put("msg", msg); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } }
使用Controller层进行发送:
@RestController @RequestMapping("/mall/rabbitmq") public class RabbitMQController { @Resource private RabbitMQService rabbitMQService; @PostMapping("/sendMsg") public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception { return rabbitMQService.sendMsg(msg); } }
消费者:
参考文章:
深入理解AMQP协议_My Blogs-CSDN博客_amqp
RabbitMQ 工作原理和常用工作模式 --贺兰的博客_贺兰山的那个脉的博客-CSDN博客
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)