- RabbitMQ 使用场景
- 服务解耦
- 流量削峰
- 异步调用
- 基本概念
- Exchange-交换机
- Queue-消息队列
- Binding Key-绑定键
- Routing Key-路由键
- 六种工作模式
- 简单模式——(寄信)——消息具有唯一性
- 工作模式——(卖家发快递)——消息具有唯一性
- 消息确认——手动回执ACK
- 合理地分发——只接受1条消息
- 消息持久化——队列持久化,消息持久化
- 发布订阅模式——(发广播)——消息克隆
- 路由模式——(可以调频接收的广播)——消息克隆
- 生产者配置:
- 消费者配置:
- 主题模式——(可以调频接收的广播)——消息克隆
- 交换机声明
- RPC模式
- 客户端代码:
- 服务端代码:
常规的微服务调用:
RabbitMQ解耦的情况:
高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
请求放入RabbitMQ中后,不管后续。直接继续跑
RabbitMQ对接下游服务,慢慢消化。实现异步
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化 *** 作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
根据Binding规则将消息 路由 给服务器中的 队列
ExchangeType决定了 路由消息的行为
常用类型有:
direct、Fanout、Topic
我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中,等待消费者来取。
(如果路由找不到相应的queue则数据会丢失)
它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。
Routing Key-路由键生产者发送的,来指定这个消息的路由规则。
这个routing key需要与Exchange Type及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里。
生产者就是发信人 ——> rabbitmq就是邮政 ——> 消费者就是收信人。
- 消息向默认交换机发送
- 默认交换机隐含与所有队列绑定
- routing key 即为队列名称
因此
exchange参数:为空串
routingKey参数:对于默认交换机,路由键就是目标队列名称
生产者就是卖家 ——> rabbitmq就是快递 ——> 消费者就是买家。
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者。
工作模式,就是简单模式在多个消费者情况下的处理方式。需要消息确认(消费者挂了给其他人),需要合理分发(不回执不发送),需要持久化(防止rabbitmq挂掉)
消费者配置中:
在处理消息的回调对象中,
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//发送回执。
接受消息的
channel.basicConsume("helloworld", f==alse==, callback, cancel);
一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?
我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者
所以我们需要手动回执。
消费者执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。
消费者配置中:
channel.basicQos(1);//一次只接收一条消息
我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者
消息持久化——队列持久化,消息持久化当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
生产者,消费者者配置中:
//第二个参数是持久化参数durable ch.queueDeclare("helloworld", ==true==, false, false, null);//队列持久化
额外属性设置为:
MessageProperties.PERSISTENT_TEXT_PLAIN//消息持久化
生产者需要发送一个路由键,用来指定我们声明的fanout交换机。
fanout交换机,和消费者队列形成绑定。执行群发
在订阅模式下,消费者不再绑定队列,而是每个消费者都有一个属于自己的队列。通过这个队列绑定交换机
生产者配置:
定义交换机
channel.exchangeDeclare("logs", "fanout");//定义名字为logs的交换机,交换机类型为fanout
发送消息,指定路由键即可。
//第一个参数,向指定的交换机发送消息 //第二个参数,不指定队列,由消费者向交换机绑定队列 //如果还没有队列绑定到交换器,消息就会丢失, //但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。 ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
消费者配置:
这里
channel.queueBind(queueName, "logs", "");//这里logs是交换机名,路由键不路由键都无所谓。路由模式——(可以调频接收的广播)——消息克隆
前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。
我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。
生产者配置:channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//声明交换机名
ch.basicPublish("direct_logs", level, null, msg.getBytes());//level为路由键。只能由绑定这个路由的,路由键匹配的接收消费者配置:
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//消费者也声明这个交换机
channel.queueBind(queueName, "direct_logs", level); //fanout绑定交换机就够了,direct模式则还需要声明本消费者对应的序列的路由键主题模式——(可以调频接收的广播)——消息克隆
虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。
Topic和路由模式基本一样,差别在与路由键可以匹配通配符 *. 和 .#
#:可以匹配路由键中 0-多个 单词
*:可以匹配路由键中 单 个单词
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//交换机需要改变RPC模式
如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为 远程过程调用 ,即RPC
因此:这里我们不再理解为生产者消费者。而是客户端服务端
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址,这里使用默认队列。
并且:
要考虑一个问题,响应消息在一个回调队列中,我们如何分辨这个响应是哪个请求的?
这时候我们需要一个唯一标识来标记每个请求——关联ID (correlationId)
package rabbitmq.rpc; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCClient { Connection con; Channel ch; public RPCClient() throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); con = f.newConnection(); ch = con.createChannel(); } public String call(String msg) throws Exception { //自动生成对列名,非持久,独占,自动删除 String replyQueueName = ch.queueDeclare().getQueue(); //生成关联id String corrId = UUID.randomUUID().toString(); //设置两个参数: //1. 请求和响应的关联id //2. 传递响应数据的queue BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数 ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8")); //用来保存结果的阻塞集合,取数据时,没有数据会暂停等待 BlockingQueue服务端代码:response = new ArrayBlockingQueue (1); //接收响应数据的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据 if (message.getProperties().getCorrelationId().contentEquals(corrId)) { //把收到的响应数据,放入阻塞集合 response.offer(new String(message.getBody(), "UTF-8")); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //开始从队列接收响应数据 ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback); //返回保存在集合中的响应数据 return response.take(); } public static void main(String[] args) throws Exception { RPCClient client = new RPCClient(); while (true) { System.out.print("求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); String r = client.call(""+n); System.out.println(r); } } }
package rabbitmq.rpc; import java.io.IOException; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); ch.queueDeclare("rpc_queue",false,false,false,null); ch.queuePurge("rpc_queue");//清除队列中的内容 ch.basicQos(1);//一次只接收一条消息 //收到请求消息后的回调对象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //处理收到的数据(要求第几个斐波那契数) String msg = new String(message.getBody(), "UTF-8"); int n = Integer.parseInt(msg); //求出第n个斐波那契数 int r = fbnq(n); String response = String.valueOf(r); //设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应 BasicProperties replyProps = new BasicProperties.Builder() .correlationId(message.getProperties().getCorrelationId()) .build(); ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); //发送确认消息 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; // CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认 ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback); } protected static int fbnq(int n) { if(n == 1 || n == 2) return 1; return fbnq(n-1)+fbnq(n-2); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)