生产者将消息投递到交换器,然后交换器再将消息路由到一个或者多个队列中。
RabbitMQ 定义了4种类型的交换器:
在使用交换器之前,需要先创建交换器,RabbitMQ 的 Java 客户端提供了 exchangeDeclare() 方法来声明交换器。
参数说明:
返回值:
方法重载:
有创建就会有删除,RabbitMQ 的 Java 客户端提供了 exchangeDelete() 方法来删除交换器。
参数说明:
返回值:
exchangeDeclarePassive() 方法用来检测交换器是否存在。如果存在,则正常返回;如果不存在,则抛出异常 404 channel exception
队列在 RabbitMQ 中用来存储消息,队列通过 BindingKey 与 交换器相互绑定。
与 exchangeDeclare() 方法相比, queueDeclare() 的重载方法少很多,只有两个重载方法:
参数说明:
返回值:
与交换器一样,队列也可以删除
参数说明:
返回值:
RMQ线上集群( v3612 )purge一个大量消息堆积(100W+)的队列时,有可能导致客户端报大量发送超时异常。
purge *** 作实际发生的事情( rabbit_amqqueue_process erl):
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS) ,
BQ默认对应 rabbit_variable_queue 模块:
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
rabbit_msg_store :remove(MsgIds, MCSState1)
end)
也就是purge *** 作,最终是由 rabbit_msg_store 进程来进行 *** 作的。
rabbit_msg_store 进程,每个节点只有一个,因此容易成为瓶颈。
rabbitmq 进程间通信采用 credit_flow 机制,一般一条持久化消息发送到持久化队列的大致过程如下:
rabbit_reader → rabbit_channel → rabbit_amqqueue_process → rabbit_msg_store
当 rabbit_msg_store 进程忙于大量消息的purge *** 作时,不能及时处理其上游 rabbit_amqqueue_process 的消息,这会导致其上游 rabbit_amqqueue_process 很快耗光其 credit 值,从而造成 flow。同理,
当 rabbit_amqqueue_process 进程由于 flow 被 block 住,不能及时处理其上游 rabbit_channel 的消息,导致 rabbit_channel 很快耗光其 credit 值,也造成 flow。最终限流状态会一直追溯到最上层 connection。
此时客户端发送会卡顿(表现出等待confirm超时等异常)
因此, 对于线上有业务量的RMQ集群,如果有大量消息堆积的队列需要清理,最好不要直接purge,有可能对线上业务造成影响 。
最保险的清理方式,起 Consumers 消费(接收即丢弃)
线上业务集群节点 TcpExtpruneCalled 指标报警,同时发现有一队列处于 flow 状态,进而判断持久化进程 rabbit_msg_store 出现瓶颈,通过 sar 工具发现以下异常:
最终定位到节点对应的宿主机底层IO有问题,及时进行规避。
Finding bottlenecks with RabbitMQ 33
Purge a large queue is slow
DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。
在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。
以下几种情况导致消息变为死信:
对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了BasicNack 或者BasicReject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
pomxml添加依赖
applicationproperties配置RabbitMQ连接信息
启动类实现
RabbitConfig类实现
GoController类实现
以上就是关于一文带你了解RabbitMQ到底是个什么鬼!全部的内容,包括:一文带你了解RabbitMQ到底是个什么鬼!、RabbitMQ架构、【MQ】RabbitMQ 交换器与队列等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)