- 1、MQ介绍
- 1.1、应用场景
- 1.1.1、异步处理
- 1.1.2、应用解耦
- 1.1.3、流量削锋
- 1.1.4、日志处理
- 1.1.5、消息通讯
- 1.2、示例
- 1.2.1、电商系统
- 1.2.2、日志收集系统
- 1.2.3、事务处理
- 1.3、MQ 对比
- 2、RocketMQ 基本理论
- 2.1、消息存储
- 2.2、消息发送
- 2.3、消息消费
- 2.3.1、广播消费
- 2.3.2、集群消费
- 2.3.2.1、平均分配算法
- 2.3.2.2、环形平均算法
- 2.3.2.3、机房临近法
- 3、RocketMQ 集群搭建
- 3.1、网络架构
- 3.2、集群搭建方式
- 3.2.1、集群特点
- 3.2.2、集群模式
- 3.2.2.1、单Master模式
- 3.2.2.2、多Master模式
- 3.2.2.3、多Master多Slave模式(异步)
- 3.2.2.4、多Master多Slave模式(同步)
- 3.3、双主双从集群搭建
- 3.3.1、总体架构
- 3.3.2、集群工作流程
- 4、深入理解RocketMQ
- 4.1、消息存储
- 4.1.1、存储模型
- 4.1.2、消息存储结构
- 4.1.3、存储介质
- 4.1.4、刷盘机制
- 4.2.5、高可用性机制
- 4.2.5.1 消息消费高可用
- 4.2.5.2 消息发送高可用
- 4.2.5.3、消息主从复制
- 4.3.4、消息的存储和发送
- 4.3.4.1、消息存储
- 4.3.4.2 、消息发送
- 4.4、顺序消息
- 4.4.1、什么是顺序消息
- 4.4.2、RocketMQ顺序消息实现
- 4.4.3、如何保证顺序
- 4.5、延时消息
- 4.6、批量消息
- 4.7、过滤消息
- 5.8、消息去重
- 4.9.1、堆积能力
- 4.9.2、跳过非重要消息
- 4.10、事务消息
- 4.10.1、流程分析
- 4.10.2、发送事务消息
- 4.10.2.1、创建事务性生产者
- 4.10.2.2、实现事务的监听接口
- 4.10.3、使用限制
- 4.11、负载均衡
- 4.11.1、Producer负载均衡
- 4.11.2、Consumer负载均衡
- 4.12、消息重试
- 4.13、死信队列
- 4.13.1、死信特性
- 4.14、消费幂等
- 4.14.1、消费幂等的必要性
- 4.14.2、处理方式
- 5、源码分析
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。
优势:
- 削峰填谷:大促等流量洪流突袭时,MQ可以缓冲突发流量,避免下游订阅系统因突发流量崩溃
- 系统解耦:解决不同程度、不同能力级别系统间依赖导致一死全死
- 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
劣势: - 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
- 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。
- 一致性问题:
MQ 可应用在多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、
快递物流、广告营销、社交、即时通信、手游、视频、物联网、车联网等。
例如:
- 日志监控,作为重要日志的监控通信管道,将应用日志监控对系统性能影响降到最低。
- 消息推送,为社交应用和物联网应用提供点对点推送,一对多广播式推送的能力。
- 金融报文,发送金融报文,实现金融准实时的报文传输,可靠安全。
- 电信信令,将电信信令封装成消息,传递到各个控制终端,实现准实时控制和信息传递。
从功能角度考虑
RocketMQ 在实际应用中常用的使用场景、主要有异步处理,应用解耦,流量削锋和消息通讯四个场景。
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是 50 毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是 50 毫秒。因此架构改变后,系统的吞吐量提高到每秒 20 QPS。
系统的耦合性越高,容错性就越低
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存 *** 作
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。 秒杀业务根据消息队列中的请求信息,再做后续处理
日志采集客户端,负责日志数据采集,定时写受写入 Kafka 队列 Kafka 消息队列,负责日志数据的接收,存储和转发 日志处理应用:订阅并消费 kafka 队列中的日志数据
客户端 A 和客户端 B 使用同一队列,进行消息通讯。
客户端 A,客户端 B,客户端 N 订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
银行转账:
目前主流的 MQ 主要是:
- ZeroMQ
- 推特的 Distributedlog
- ActiveMQ:Apache 旗下的老牌消息引擎
- RabbitMQ、Kafka:AMQP 的默认实现。
- RocketMQ
- Artemis:Apache 的 ActiveMQ 下的子项目
- Apollo:同样为 Apache 的 ActiveMQ 的子项目的号称下一代消息引擎
- 商业化的消息引擎 IronMQ
- 实现了 JMS(Java Message Service)标准的 OpenMQ
- 消费者发送的 Message 会在 Broker 中的 Queue 队列中记录
- 一个 Topic 的数据可能会存在多个 Broker 中
- 一个 Broker 存在多个 Queue
每个 Topic 在 Broker 上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向 commit log 的消息索引
消息发送及接受流程:
一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
这里所谓的平均分配算法,并不是指的严格意义上的完全平均,如上面的例子中,10个queue,而消费者只有 4 个,无法是整除关系,除了整除之外的多出来的 queue,将依次根据消费者的顺序均摊。
consumer-1:3 个
consumer-2:3 个
consumer-3:2 个
consumer-4:2 个
是指根据消费者的顺序,依次在由 queue 队列组成的环形图中逐个分配
Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
Message Queue:相当于是Topic的分区;用于并行发送和接收消息
- Producer:
消息生产者,位于用户的进程内,Producer 通过 NameServer 获取所有 Broker 的路由信息,根据负载均衡策略选择将消息发到哪个 Broker,然后调用 Broker 接口提交消息。 - Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。 - Consumer:
消息消费者,位于用户进程内。Consumer 通过 NameServer 获取所有 broker 的路由信息后,向Broker发送Pull请求来获取消息数据。Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有 Consumer,集群模式下消息只会发送给一个 Consumer。 - Consumer Group:
消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。 - Topic:
Topic 用于将消息按主题做划分,Producer 将消息发往指定的 Topic,Consumer 订阅该Topic 就可以收到这条消息。Topic 跟发送方和消费方都没有强关联关系,发送方可以同时往多个 Topic 投放消息,消费方也可以订阅多个 Topic 的消息。在 RocketMQ 中,Topic 是一个上逻辑概念。消息存储不会按 Topic 分开。
Topic 表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息等。一条消息必须有一个 Topic。最细粒度的订阅单位,一个 Group 可以订阅多个 Topic 的消息。 - Message:
代表一条消息,使用MessageId唯一识别,用户在发送时可以设置 messageKey,便于之后查询和跟踪。一个 Message 必须指定 Topic,相当于寄信的地址。Message 还有一个可选的 Tag 设置,以便消费端可以基于 Tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 Broker 上的消息,方便在开发过程中诊断问题。 - Tag:
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Tag 表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息等。RocketMQ 提供 2 级消息分类,方便灵活控制。 - Broker:
Broker 是 RocketMQ 的核心模块,负责接收并存储消息,同时提供 Push/Pull 接口来将消息发送给 Consumer。Consumer 可选择从 Master 或者 Slave 读取数据。多个主/从组成 Broker集群,集群内的 Master 节点之间不做数据交互。Broker 同时提供消息查询的功能,可以通过 MessageID 和 MessageKey 来查询消息。Borker 会将自己的 Topic 配置信息实时同步到NameServer。 - Queue:
Topic 和 Queue 是 1 对多的关系,一个 Topic 下可以包含多个 Queue,主要用于负载均衡。发送消息时,用户只指定 Topic,Producer 会根据 Topic 的路由信息选择具体发到哪个 Queue 上。Consumer 订阅消息时,会根据负载均衡策略决定订阅哪些 Queue 的消息。
消息的物理管理单位。一个 Topic 下可以有多个 Queue,Queue 的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。 - Offset:
RocketMQ 在存储消息时会为每个 Topic 下的每个 Queue 生成一个消息的索引文件,每个
Queue 都对应一个 Offset记录当前 Queue 中消息条数
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
3.2.2.2、多Master模式一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:
配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高; - 缺点:
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
3. 优点:
即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
4. 缺点:
Master宕机,磁盘损坏情况下会丢失少量消息。
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
5. 优点:
数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
6. 缺点:
性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
消息高可用采用2m-2s(同步双写)方式
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
RocketMQ文件存储模型层次结构如下图所示,根据类别和作用从概念模型上大致可以划分为5层:
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的;
CommitLog:消息真正的物理存储文件;
ConsumeQueue:消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。
每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
CommitLog:存储消息的元数据
ConsumerQueue:存储消息在CommitLog的索引
IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
- 关系型数据库DB:
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。 - 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
性能对比:
文件系统>关系型数据库DB
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写 *** 作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的一个。
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
4.2.5.2 消息发送高可用在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
1)同步复制
同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
2)异步复制
异步复制方式是只要Master写成功即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成SYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
3)总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
4.3.4、消息的存储和发送 4.3.4.1、消息存储磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
4.3.4.2 、消息发送Linux *** 作系统分为【用户态】和【内核态】,文件 *** 作、网络 *** 作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的 *** 作,实际进行了4 次数据复制,分别是:
- 从磁盘复制数据到内核态内存;
- 从内核态内存复制到用户态内存;
- 然后从用户态内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复制到网卡中进行传输。
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的。
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制, 其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存, 这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了4.4、顺序消息 4.4.1、什么是顺序消息
顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消息包含两种类型:
分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。
这是阿里云上对顺序消息的定义,把顺序消息拆分成了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?
如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是需要用户自己去保障的而对于顺序消费,则需要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为什么不说他们应该在一个线程中被消费呢?)
全局顺序其实是分区顺序的一个特例,即使Topic只有一个分区(以下不在讨论全局顺序,因为全局顺序将面临性能的问题,而且绝大多数场景都不需要全局顺序)
在MQ的模型中,顺序需要由3个阶段去保障:
- 消息被发送时保持顺序:意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。
- 消息被存储时保持和发送的顺序一致:要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。
- 消息被消费时保持和存储的顺序一致:要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序):
- 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去。
- 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证。
- 消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
4.6、批量消息批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
4.7、过滤消息在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC","TAGA||TAGB||TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。
5.8、消息去重造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办 法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
- 消费端处理消息的业务逻辑保持幂等性:
保持幂等性,不管来多少条重复消息,最后处理的结果都一样。应该在消费端实现,不属于消息系统要实现的功能 - 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现:
是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。 RocketMQ不保证消息不重复,如果业务需要保证严格的不重复消息,需在业务端去重。
衡量消息中间件堆积能力的几个指标:
在有Slave情况下,Master一旦出现Consumer访问堆积在磁盘的数据时,会向Consumer下达一个定向吐指令,令Consumer从Slave拉取数据,这样正常消费的Consumer不会因为消息堆积受影响,因为系统堆积场景与非堆积场景分割在了两个不同的节点处理。Slave的写性能不会因此下降,因为Slave的消息写入只追求吞吐量,不追求实时性,只要整体的吞吐量高就可以,而Slave每次都是从Mater拉取一批数据,入1M,这种批量顺序写入方式即使堆积,整体吞吐量影响相对较小。
4.9.2、跳过非重要消息发生消息堆积时,如果消费速度一直追不上发送速度,可以选择丢弃不重要的消息
4.10、事务消息 4.10.1、流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1、事务消息发送及提交
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit *** 作生成消息索引,消息对消费者可见)
2、事务补偿
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3、事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- CommitTransaction:提交事务,它允许消费者消费此消息。
- RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
- Unknown:中间状态,它代表需要检查消息队列来确定状态。
使用TransactionMQProducer类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
public static void main(String[] args) throws MQClientException, InterruptedException { //创建事务监听器 TransactionListener transactionListener = new TransactionListenerImpl(); //创建消息生产者 TransactionMQProducer producer = new TransactionMQProducer("group6"); producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); //生产者这是监听器 producer.setTransactionListener(transactionListener); //启动消息生产者 producer.start(); String[] tags = new String[]{ "TagA", "TagB", "TagC" } ; for (int i = 0; i < 3; i++) { try { Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i, ("HelloRocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); TimeUnit.SECONDS.sleep(1); }catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } }4.10.2.2、实现事务的监听接口
当发送半消息成功时,我们使用executeLocalTransaction方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public classTransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object o) { System.out.println("执行本地事务"); if (StringUtils.equals("TagA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TagB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("MQ检查消息Tag【"+messageExt.getTags()+"】的本地事务执行结果"); return LocalTransactionState.COMMIT_MESSAGE; } }4.10.3、使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax)则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
- 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至Queue 1,以此类推。
1)集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条messagequeue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
1)广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。
1)顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
1)无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
重试次数:
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
**注意:**一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
配置方式
1)消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 Action.ReconsumeLater (推荐)
- 返回 Null
- 抛出异常
2)消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
3)自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
Propertiesproperties=newProperties();//配置对应GroupID的最大消息重试次数为20次properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); Consumerconsumer=ONSFactory.createConsumer(properties)
注意: - 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。 - 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。 - 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
4)获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
public class MessageListenerImpl implements MessageListener{ @Override public Action consume(Message message, ConsumeContext context) { //获取消息的重试次数 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }4.13、死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-LetterMessage),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
4.13.1、死信特性死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性
- 一个死信队列对应一个 Group ID,而不是对应单个消费者实例
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
4.14.1、消费幂等的必要性在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
- 发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。 - 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
Message message = new Message(); message.setKey("ORDERID_100"); SendResult sendResult = producer.send(message);
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test","*",newMessageListener(){ publicActionconsume(Messagemessage,ConsumeContextcontext){ Stringkey=message.getKey() //根据业务唯一标识的key做幂等处理 } });5、源码分析
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)