一篇文章带你了解RabbitMQ

一篇文章带你了解RabbitMQ,第1张

一篇文章带你了解RabbitMQ 四大核心概念

生产者:产生数据发送消息的程序是生产者

交换机:交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

Rabbit的六大模式

 

工作原理

 

Broker:接收和分发消息的应用,RabbitMQServer就是MessageBroker

Virtualhost:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQserver提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

Connection:publisher/consumer和broker之间的TCP连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQPmethod包含了channelid帮助客户端和messagebroker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了 *** 作系统建立TCPconnection的开销Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routingkey,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

Queue:消息最终被送到这里等待consumer取走

Binding:exchange和queue之间的虚拟连接,binding中可以包含routingkey,Binding信息被保存到exchange中的查询表中,用于message的分发依据

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。

应答策略:

1、自动应答(默认):消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被 *** 作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

应答方式:A.Channel.basicAck(用于肯定确认)RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了

B.Channel.basicNack(用于否定确认)

C.Channel.basicReject(用于否定确认)与Channel.basicNack相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了

2、手动应答:默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。同时手动应答的好处是可以批量应答并且减少网络拥堵

multiple的true和false代表不同意思true代表批量应答channel上未应答的消息比如说channel上有传送tag的消息5,6,7,8当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答;false同上面相比只会应答tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

之前我们创建的队列都是非持久化的,rabbitmq如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable参数设置为持久化;

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。

分发策略

1、轮询分发(默认):在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。

2、不公平分发:为了避免这种情况,我们可以设置参数channel.basicQos(1);

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后rabbitmq就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他存储任务的策略。

3、预取值分发:本身消息的发送就是异步发送的,所以在任何时候,channel上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

这个时候就可以通过使用basic.qos方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息5、6、7,8,并且通道的预取计数设置为4,此时RabbitMQ将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被ack。比方说tag=6这个消息刚刚被确认ACK,RabbitMQ将会感知这个情况到并再发送一条消息。消息应答和QoS预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为1是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,所以在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

发布确认原理

生产者将信道设置成/confirm/i模式,一旦信道进入/confirm/i模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

/confirm/i模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

开启发布确认的方法:发布确认默认是没有开启的,如果要开启需要调用方法/confirm/iSelect,每当你要想使用发布确认,都需要在channel上调用该方法

前提:队列开启持久化 ,消息开启持久化,开启发布确认

策略:

1、单个发布确认:这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

2、批量发布确认:上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量;

当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

3、异步发布确认:异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentlinkedQueue这个队列在/confirm/i

callbacks与发布线程之间进行消息的传递。

发布订阅

交换机:生产者只能将消息发送给交换机,交换机接受到消息之后,再决定是将他们推送到一个队列还是多个队列或者是丢弃,由交换机的类型决定;

交换机的类型:

1、直接(direct):routingKey不一样,如果一样的话其实和fanout类型是差不多的

例如:对不同队列绑定不同的routeKey,消费者可以根据自身逻辑处理接受到的日志(打印,存磁盘等)

 

 2、主题(topic):因为直接队列不同的routingKey不能被多个队列消费(warning你又想给A队列又想给B队列),所以出现了主题队列,出现了主题交换机(规范 英文.英文.英文 长度不能超过255,*匹配单个,#匹配零个或多个,如果只有一个#相当于fanout,不出现*和#相当于direct,例如:"stock.usd.nyse","nyse.vmw",*.*.rabbit,azy.#)

 3、标题(headers):和topic交换机有点类似,但是他不是基于routingKey而是基于消息的头部

4、扇出(fanout):发布订阅,无差别发送(routingKey为“”)

例如:对多个队列发送无差别发送日志信息,消费者可以根据自身逻辑处理接受到的日志(打印,存磁盘等)

 5、无名类型(“”):不要交换机,单个队列

临时队列:每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。创建临时队列的方式如下:

String queueName=channel.queueDeclare().getQueue();

绑定(bindings):什么是bingding呢,binding其实是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是X与Q1和Q2进行了绑定

 死信队列:当A队列的消息被拒绝((basic.reject或basic.nack)并且requeue=false.),超过队列长度或者超时可以将消息放入到死信队列;

 

场景:下单成功超时未支付的订单

延迟队列(基于队列延迟和死信队列实现):队列内部是有序的,主要具备延时属性,延时队列的元素是希望在指定时间到达之前或之后处理,延迟队列就是用来存放在指定时间被处理的元素的队列;

场景:1、订单在十分钟内未支付则自动取消

2、新创建的店铺,如果十天内没有上传过商品,则发送消息提醒

3、用户注册成功后,三天内未登录则发送消息提醒

4、用户发起退款,三天内没处理则通知相关运营人员

5、预定会议,开会前十分钟通知会议相关参与人员

 

RabbitMQ中的TTL

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,

如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用,有两种方式设置TTL。

1、第一种是在创建队列的时候设置队列的“x-message-ttl”属性

2、另一种方式便是针对每条消息设置TTL

两者的区别:如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

我们介绍了死信队列,刚刚又介绍了TTL,至此利用RabbitMQ实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

 不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,

那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

 问题:在发送定时消息时,RabbitMQ只会检查第一个消息是否过期,如果第一个消息的过期时间很长,第二个消息的过期时间很短,第二个消息不会优先得到执行;

基于插件解决延时的问题:rabbitmq_delayed_message_exchange

在官网上下载https://www.rabbitmq.com/community-plugins.html,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重启RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

延时队列在需要延时处理的场景下非常有用,

使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景

发布确认(高级篇)

在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:

确认机制方案:

 

代码架构图:

 

首先application.yml开启回调spring.rabbitmq.publisher-/confirm/i-type=correlated

1、none(默认值):不开启

2、correlated:发布消息成功之后会触发回调

3、simpe(同步):两种效果,一、和correlated效果一样 二、调用waitFor/confirm/is或者waitFor/confirm/isOrDie方法,如果返回false可能会关闭channel

消息回退

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

问题:在仅开启了生产者确认机制的情况下,交换机接收到消息之后,会直接给生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者无感;(通过mandatory参数解决)

首先application.yml开启配置spring.rabbitmq.publisher-returns=true

实现RabbitTemplate.ReturnCallback接口(失败才会调用)

备份交换机

有了mandatory参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置mandatory参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码架构图

 

mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

消息重复消费

生产者发送消息,消费者接收消息之后返回ack前,网络中断,mq未收到确认消息,导致重发

解决:生成消息的全局ID,每次消费前判断是否消费过(数据库或者redis的setnx)

优先级队列

队列设置优先级(0-255,不建议设置过大,会吃内存和cpu),消息也设置优先级(0-255)

惰性队列(占用内存少,消费慢)

定义:消息保存在内存中还是磁盘上,正常情况下消息保存在内存中,而惰性队列的消息是保存在磁盘中;

使用场景:在消费者下线,宕机或因为维护而关闭时,导致长时间无法进行消费,惰性队列就很有必要了;

使用:队列具备两种模式:default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。

下面示例中演示了一个惰性队列的声明细节:

Map args = new HashMap();

args.put("x-queue-mode", "lazy");

channel.queueDeclare("myqueue",false,false,false,args);

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB

集群:

如果RabbitMQ集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisher/confirm/i机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

镜像队列:多个节点下默认在A节点创建的队列,A节点宕机之后其他节点 *** 作不了,导致消息丢失,使用镜像队列可以备份实现消息不丢失;

Haproxy+Keepalive实现高可用负载均衡:

整体架构图

 

HAProxy提供高可用性、负载均衡及基于TCPHTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。

扩展nginx,lvs,haproxy之间的区别:http://www.ha97.com/5646.html

试想如果前面配置的HAProxy主机突然宕机或者网卡失效,那么虽然RbbitMQ集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入Keepalived它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移.

联邦交换机/联邦队列(Federation Exchange/FederationQueue)

(broker北京),(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisher/confirm/i机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client深圳) (broker北京)之间有很大的网络延迟,(Client深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisher/confirm/i机制或者事务机制的情况下,(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用Federation插件就可以很好地解决这个问题.

联邦队列:可以在多个Broker节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstreamqueue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。

解决:数据同步

安装插件rabbitmq_federation,rabbitmq_federation_management

 

Shovel:Federation具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker上。Shovel可以翻译为"铲子",是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

解决问题:数据同步

安装插件:rabbitmq_shovel,rabbitmq_shovel_management

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701947.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存