1.RabbitMQ的介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息的中间件)。AMQP(Advanced Message Queuing Protocol)高级消息队列协议:客户端向MQ发送的消息协议是AMQP协议。JMS(Java Message Server)一样,都是一种消息规范,相比而言可定是AMQP高级一些。
2.MQ的功能
1)异步处理:把用户的请求发送给消息中间件后,消息中间件会先进行局部响应,不需要用户等待。
2)应用解耦:模块和模块之间没有强耦合,都是通过某个中间件进行连接的
3)流量消峰:在某一段时间内,突然发生了高并发,如果某个服务器承受不住,那么添加中间件就会将当前的请求存储到队列中,一个一个处理,不会全部直接请求服务器保证了服务器的安全性。
没有添加消息中间件:
添加消息中间件
AMQP和JMS区别:
JMS是定义一套java接口,对消息处理进行了统一 *** 作,但是AMQP是通过规定协议的方式统一数据交互的。JMS必须使用java语言 *** 作,AMQP只是协议,不是实现代码,跨语言的。
JMS中只定义了两种消息模式,但是AMPQ消息模式很多,很丰富。
使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、ZeroMQ、kafka、metaMQ、RocketMQ
ActiveMQ:apache公司的
Kafka:一般大数据领域应用多,吞吐量达,不用考虑数据安全性
RecketMQ:是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,安全性能好。(推荐使用RabbitMQ,RabbitMQ用的是AMQP协议)。
RabbitMQ管理平台默认地址:http://localhost:15672
默认用户名和密码是:guest和guest,最好不要用整个用户 *** 作。
1、使用guest登陆后,选择admin,创建一个新用户xxx。
2、创建虚拟节点/hello
3、添加成功
4、设置虚拟节点的权限(给哪个用户用)
RabbitMQ提供了五种消息模型(其实一共7种,第六种RPC远程服务调用,第七种发布确认)以下只解释了五种,基本消息模式,工作队列模式,Fanout订阅模式,Direct订阅模式,Topic订阅模式。
编写客户端连接RabbitMQ服务器 生产者与消费者模式(一对一)1.创建工程
2.导入RabbitMQ依赖
com.rabbitmq amqp-client5.9.0
3.编写连接工具类
public class RabbitMQUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置连接地址 我使用的windows 所以是localhost connectionFactory.setHost("localhost"); //3.端口号 ,因为是java客户端,所以用amqp协议 connectionFactory.setPort(5672); //4.设置用户名,密码 , 虚拟节点名 connectionFactory.setVirtualHost("/hello"); connectionFactory.setUsername("Gyz"); connectionFactory.setPassword("123123"); Connection connection = connectionFactory.newConnection(); return connection; } }
4.端口号解释
5.编写案例:Hello World
1.如果先运行消息的提供方,在运行消息的消费方,谁优先启动,谁就直接获取了所有的消息
2.如果我们先把两个消费者方启动后,监听消息的提供方,提供数据的时候,那么我们两个消费方就会均匀的拿取数据,因为RabbitMQ属于轮询方式发送消息,所以RabbitMQ会自动把数据分配好
3.如果我们先把两个消费者方启动后,在提供数据,有一个消费者出现了阻塞状态,可以设置成能者多劳的模式,要出现这个模式的前提要关闭ack自动确认机制,在使用channel.basicQos(1),设置每次拿一个消息,消费一个后再去拿第二个,当出现阻塞的时候就不会出现第二种情况
能者多劳模式的运行结果
消费者1 接收的信息为: 第1条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第3条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第5条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第6条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第8条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第9条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第11条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第12条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第14条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第15条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第17条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第18条: 地瓜地瓜 我是土豆 消费者1 接收的信息为: 第20条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第2条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第4条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第7条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第10条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第13条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第16条: 地瓜地瓜 我是土豆 消费者2 接收的信息为: 第19条: 地瓜地瓜 我是土豆 消费者2消费的少,这样资源就合理利用,不浪费, 用到极致发布订阅(Publish/Subscribe)
1、1个生产者,多个消费者 2、每个消费者都有自己的队列 3、生产者没有将消息直接发送到队列中,是直接发送到转换机exchange中 4、每个队列都需要绑定转换机 5、生产者发送的消息,经过交换机到达队列,实现一个消息被国歌消费者消费(Fanout分发)
exchange:交换机一方面是接收生产这发送的消息,另一方面他自己知道如何让处理消息,例如将消息交给队列,或者将某些消息丢弃。
Fanout(分发广播):将消息交给所有绑定到交换机的队列//1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建一个通道 Channel channel = conn.createChannel(); //3.声明一个交换机 交换机处理消息的类型是什么: fanout , Direct , Topic //参数一: 交换机的名字, 参数二: 处理消息的类型 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String message = "地瓜地瓜 我是土豆"; //参数一: 交换机的名字 ,参数二:路由标识 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); //发布数据 System.out.println("消息已发送: "+ message); //==================== //1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建通道 Channel channel = conn.createChannel(); //3.创建一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //4.从通道中拿去数据 channel.queueDeclare(QUEUE_NAME,false,false ,false,null); //5.将队列 绑定到 交换机上 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //匿名类: 从通道中 获取数据 //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认 channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){ //重写方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str =new String(body); System.out.println("接收的信息为1 : "+str); } });
总结:分发是先声明交换机,再将指定的队列绑定到交换机上
Direct(选择定向):将消息发送给指定的Routing Key的队列功能:各取所需,按需分配,选择性的接收消息。
在订阅模式中,发布者发布消息后,订阅者可以使用选择性的获取所需要的消息,这里面需要添加一个标识,就是路由Routing Key这样就可以分开了。
p:生产者,向exchange交换机中发送消息,发送消息时带有一个Routing Key 路由 X:交换机,接受生产者发布的消息,然后把消息传递给符合Routing Key路由匹配的队列 C1:消费者1,所绑定的队列按照路由标识去消费,按照图片解释,消费orange C2:消费者2 所绑定的队列按照路由标识去消费,按照图片解释,消费black,green
部分代码
=====provider //1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建一个通道 Channel channel = conn.createChannel(); //3.声明一个交换机 交换机处理消息的类型是什么: fanout , Direct , Topic //参数一: 交换机的名字, 参数二: 处理消息的类型 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String message = "橘子橘子 我是苹果"; //参数一: 交换机的名字 ,参数二:路由标识orange channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes()); //发布数据 System.out.println("消息已发送: "+ message); ============================consumer //1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建通道 Channel channel = conn.createChannel(); //3.创建一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //4.从通道中拿去数据 channel.queueDeclare(QUEUE_NAME,false,false ,false,null); //5.将队列 绑定到 交换机上 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"orange"); //匿名类: 从通道中 获取数据 //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认 channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){ //重写方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str =new String(body); System.out.println("接收的信息为1 : "+str); } }); }
总结:消费者只能消费自己绑定的路由键的数据,获取不到没绑定的
Topic(通配符):根据模糊查询确定发送到哪个队列
Topic类型的Exchange与Direct相比,都是根据RoutiingKey把消息路由到不同的队列中的,只不过是Topic类型Exchange交换机的路由RoutingKey可以使用通配符
#:匹配一个或多个词 *:匹配一个词
部分代码
======provider //1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建一个通道 Channel channel = conn.createChannel(); //3.声明一个交换机 交换机处理消息的类型是什么: fanout , Direct , Topic //参数一: 交换机的名字, 参数二: 处理消息的类型 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String message = "绿色绿色"; //参数一: 交换机的名字 ,参数二:路由标识orange channel.basicPublish(EXCHANGE_NAME,"com.oracle.pojo.haha",null,message.getBytes()); //发布数据 System.out.println("消息已发送: "+ message); ===========consumer //1.获取连接 Connection conn = RabbitMQUtil.getConnection(); //2.创建通道 Channel channel = conn.createChannel(); //3.创建一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //4.从通道中拿去数据 channel.queueDeclare(QUEUE_NAME,false,false ,false,null); //5.将队列 绑定到 交换机上 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"com.oracle.*"); //匿名类: 从通道中 获取数据 //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认 channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){ //重写方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str =new String(body); System.out.println("接收的信息为1 : "+str); } });死信队列
简称:DLX(dead-letter-exchange)就是单独的创建一个队列,这个队列中存储的就是过期的消息和被拒绝的消息。
消息变成死信的情况:
- 消息被拒绝时,或者requeue=false,重新入队失败
- 消息的TTL(Time-To-Live)过期
- 队列达到最大长度时,信息存储不进去
死信消息的处理过程: - DLX也是一个正常的Exchange交换机,和一般的交换机没有区别,它能在任何队列上绑定,实际上就是设置某个队列的属性;
- 当前这个队列中有死信时,RabbitMQ会自动将这个消息发布到设置好的Exchange上去,就是被路由到另一个交换机上
- 可以监听这个队列中的信息
消息自动确认,如果时work Queue,形式要取消自动确认,变成手动
SpringBoot整合RabbitTemplate配置yml文件:
spring: application: name: springboot_rabbitmq #当前项目名 rabbitmq: virtual-host: /hello #虚拟节点 host: localhost port: 5672 username: xxx password: 123123 listener: direct: acknowledge-mode: manual #设置消息开启手动确认
RabbitConfig类
package com.oracle.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { workQueue模式 , 创建一个队列 @Bean public Queue queueWork1(){ return new Queue("workQueueBoot"); } }
发送类
@Component public class RabbitUtil { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ for(int i = 1; i <= 20 ;i++){ rabbitTemplate.convertAndSend("workQueueBoot","SpringBoot测试RabbitMQ: "+ i); } } }
监听类
package com.oracle.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MessageReceiveListener { //消费者1 @RabbitListener(queues = "workQueueBoot") public void receiveMessage1(String msg, Channel channel, Message message) throws IOException { //只包含发送的消息 System.out.println("消费者1收到消息: " + msg); //channel 代表通道信息 //message 附加的参数信息 } //消费者2 @RabbitListener(queues = "workQueueBoot") public void receiveMessage2(Object obj,Channel channel,Message message){ //obj 是所有消息 System.out.println("消费者2收到消息: "+ obj); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)