RabbitMQ 六个工作模式深入理解

RabbitMQ 六个工作模式深入理解,第1张

RabbitMQ 六个工作模式深入理解

文章目录
    • RabbitMQ 使用场景
      • 服务解耦
      • 流量削峰
      • 异步调用
    • 基本概念
      • Exchange-交换机
      • Queue-消息队列
      • Binding Key-绑定键
      • Routing Key-路由键
    • 六种工作模式
      • 简单模式——(寄信)——消息具有唯一性
      • 工作模式——(卖家发快递)——消息具有唯一性
        • 消息确认——手动回执ACK
        • 合理地分发——只接受1条消息
        • 消息持久化——队列持久化,消息持久化
      • 发布订阅模式——(发广播)——消息克隆
      • 路由模式——(可以调频接收的广播)——消息克隆
        • 生产者配置:
        • 消费者配置:
      • 主题模式——(可以调频接收的广播)——消息克隆
        • 交换机声明
      • RPC模式
        • 客户端代码:
        • 服务端代码:

RabbitMQ 使用场景 服务解耦

常规的微服务调用:

RabbitMQ解耦的情况:

流量削峰

高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

异步调用

请求放入RabbitMQ中后,不管后续。直接继续跑
RabbitMQ对接下游服务,慢慢消化。实现异步

基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化 *** 作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

Exchange-交换机

根据Binding规则将消息 路由 给服务器中的 队列
ExchangeType决定了 路由消息的行为
常用类型有:
direct、Fanout、Topic

Queue-消息队列

我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中,等待消费者来取。
(如果路由找不到相应的queue则数据会丢失)

Binding Key-绑定键

它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

Routing Key-路由键

生产者发送的,来指定这个消息的路由规则。
这个routing key需要与Exchange Type及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里。

六种工作模式 简单模式——(寄信)——消息具有唯一性


生产者就是发信人 ——> rabbitmq就是邮政 ——> 消费者就是收信人。

  • 消息向默认交换机发送
  • 默认交换机隐含与所有队列绑定
  • routing key 即为队列名称

因此
exchange参数:为空串
routingKey参数:对于默认交换机,路由键就是目标队列名称

工作模式——(卖家发快递)——消息具有唯一性

生产者就是卖家 ——> rabbitmq就是快递 ——> 消费者就是买家。


工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者。
工作模式,就是简单模式在多个消费者情况下的处理方式。需要消息确认(消费者挂了给其他人),需要合理分发(不回执不发送),需要持久化(防止rabbitmq挂掉)

消息确认——手动回执ACK

消费者配置中:
在处理消息的回调对象中,

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//发送回执。

接受消息的

channel.basicConsume("helloworld", f==alse==, callback, cancel);

一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?
我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者
所以我们需要手动回执。
消费者执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

合理地分发——只接受1条消息

消费者配置中:

channel.basicQos(1);//一次只接收一条消息

我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

消息持久化——队列持久化,消息持久化

当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
生产者,消费者者配置中:

//第二个参数是持久化参数durable
ch.queueDeclare("helloworld", ==true==, false, false, null);//队列持久化

额外属性设置为:
MessageProperties.PERSISTENT_TEXT_PLAIN//消息持久化

发布订阅模式——(发广播)——消息克隆


生产者需要发送一个路由键,用来指定我们声明的fanout交换机。
fanout交换机,和消费者队列形成绑定。执行群发

在订阅模式下,消费者不再绑定队列,而是每个消费者都有一个属于自己的队列。通过这个队列绑定交换机

生产者配置:
定义交换机

channel.exchangeDeclare("logs", "fanout");//定义名字为logs的交换机,交换机类型为fanout

发送消息,指定路由键即可。

//第一个参数,向指定的交换机发送消息
//第二个参数,不指定队列,由消费者向交换机绑定队列
//如果还没有队列绑定到交换器,消息就会丢失,
//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));

消费者配置:
这里

channel.queueBind(queueName, "logs", "");//这里logs是交换机名,路由键不路由键都无所谓。
路由模式——(可以调频接收的广播)——消息克隆


前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。

生产者配置:
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//声明交换机名
ch.basicPublish("direct_logs", level, null, msg.getBytes());//level为路由键。只能由绑定这个路由的,路由键匹配的接收
消费者配置:
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//消费者也声明这个交换机
channel.queueBind(queueName, "direct_logs", level);
//fanout绑定交换机就够了,direct模式则还需要声明本消费者对应的序列的路由键
主题模式——(可以调频接收的广播)——消息克隆


虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

Topic和路由模式基本一样,差别在与路由键可以匹配通配符 *. 和 .#

#:可以匹配路由键中 0-多个 单词
*:可以匹配路由键中 个单词

交换机声明
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//交换机需要改变
RPC模式

如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为 远程过程调用 ,即RPC
因此:这里我们不再理解为生产者消费者。而是客户端服务端

使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址,这里使用默认队列。

并且:
要考虑一个问题,响应消息在一个回调队列中,我们如何分辨这个响应是哪个请求的?
这时候我们需要一个唯一标识来标记每个请求——关联ID (correlationId)

客户端代码:
package rabbitmq.rpc;

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
	Connection con;
	Channel ch;
	
	public RPCClient() throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		con = f.newConnection();
		ch = con.createChannel();
	}
	
	public String call(String msg) throws Exception {
		//自动生成对列名,非持久,独占,自动删除
		String replyQueueName = ch.queueDeclare().getQueue();
		//生成关联id
		String corrId = UUID.randomUUID().toString();
		
		//设置两个参数:
		//1. 请求和响应的关联id
		//2. 传递响应数据的queue
		BasicProperties props = new BasicProperties.Builder()
				.correlationId(corrId)
				.replyTo(replyQueueName)
				.build();
		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
		
		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
		BlockingQueue response = new ArrayBlockingQueue(1);
		
		//接收响应数据的回调对象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
					//把收到的响应数据,放入阻塞集合
					response.offer(new String(message.getBody(), "UTF-8"));
				}
			}
		};

		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//开始从队列接收响应数据
		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
		//返回保存在集合中的响应数据
		return response.take();
	}
	
	public static void main(String[] args) throws Exception {
		RPCClient client = new RPCClient();
		while (true) {
			System.out.print("求第几个斐波那契数:");
			int n = new Scanner(System.in).nextInt();
			String r = client.call(""+n);
			System.out.println(r);
		}
	}
}
服务端代码:
package rabbitmq.rpc;

import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		ch.queueDeclare("rpc_queue",false,false,false,null);
		ch.queuePurge("rpc_queue");//清除队列中的内容
		
		ch.basicQos(1);//一次只接收一条消息
		
		
		//收到请求消息后的回调对象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//处理收到的数据(要求第几个斐波那契数)
				String msg = new String(message.getBody(), "UTF-8");
				int n = Integer.parseInt(msg);
				//求出第n个斐波那契数
				int r = fbnq(n);
				String response = String.valueOf(r);
				
				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(message.getProperties().getCorrelationId())
						.build();
				
				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
				//发送确认消息
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
	}

	protected static int fbnq(int n) {
		if(n == 1 || n == 2) return 1;
		
		return fbnq(n-1)+fbnq(n-2);
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683150.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存