最近在项目中使用了 activemq 进行消息的异步传递,只有一个消费者时,系统可以正常运行,但当增加多个消费者时,却只有一个消费者进行消费,其他的消费者不消费。
原因
activemq 有一个机制,叫消息预读取,这个机制默认会读取 1000 条消息发往一个消费者中,当超过 1000 消息堆积时,才会继续读取剩下的消息发往另一个消费者中。当只有一个消费者时,这个机制可以提高效率,但有多个消费者时,就不合适了,会导致其他消费者闲置。
解决办法
在客户端的连接上,拼接jmsprefetchPolicyall=xxx,就可以调整消息预读取条数,当有多个消费者时,可以适当的调低这个参数,保证其他消费者也可以消费到,从而提高消费速度。
完整的连接示例(以 spring boot 为例):
1
springactivemqbroker-url=tcp://localhost:61616jmsprefetchPolicyall=2在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。
MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。
我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+NET CORE 31
设计思路为:
内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。
那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。
下面结合代码讲下具体实现:
我们先看看业务队列发送消息时,如何定义
这里会内置上面描述的重试队列需要的参数
再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)
我们再看看PublishRetry重试队列的推送方法如何实现
重试队列的消费者实现
然后在系统中,内置重试队列消费者
首先在pom中加入依赖
然后配置yml文件
创建业务队列与死信队列
该消费者是消费死信队列中的消息
启动服务之后,可以看到创建的交换机和队列
消息过期之后从 prod_queue_pay 队列转发到 dl-queue 队列。很好的实现了消息延迟消费。但我们会发现一个问题,通过给队列属性设置过期时间,如果我现在有不同的场景,比如我5s、10s、15s之后延迟消费,那需要创建三个队列。每次有一个不同的时间段的需求过来,我都需要创建一个队列,这肯定不行。
快速入口: >按照个人理解,队列就是按顺序消费的数据结构,先进先出,不应该存在可以指定消费数据 *** 作这样的功能,一般都是你每次消费一个数据,然后判断这个数据是不是你要消费的数据,是的话按照逻辑处理,不是的话交由另一个逻辑处理,或者抛弃不处理酱紫的。
Mq通道和队列属于辅助的关系,他们两个需要共同建立才可以更好的作用。扩展资料
一个队列管理器可以有多个队列和多个通道。
队列管理器相当于RabbitMQ中的虚拟主机。
队列分为本地队列,远程队列,传输队列。
通道分为发送通道、接收通道、服务器通道等等。
发送到本地队列上的消息存储在本机上。
发送到远程队列上的消息,通过绑定传输队列传输到别的队列管理器上的本地队列上存储。
通道为消息进出队列的渠道桥梁,发送通道只能出,接收通道只能进,服务器通道可以进出。
本地例子:
发送消息===>Java程序===>服务器通道===>本地队列===>服务器通道===>Python程序===>收到消息
两家公司各自服务器AB例子:
公司A发送消息===>Java程序===>服务器通道A===>远程队列A>>>>传输队列A>>>>发送通道A>>>>接收通道B>>>>本地队列B===>服务器通道B===>Python程序===>公司B收到消息===>响应===>响应消息发送===>Python程序===>服务器通道B===>远程队列B>>>>传输队列B>>>>发送通道B>>>>接收队列A>>>>本地队列A===>服务器通道A===>Java程序===>公司A收到响应消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)