Apache RocketMQ是统一的消息引擎,轻量级的数据处理平台。具有如下的特性:
1、低延迟(Low Latency):在高负载下能保证99.6%的毫秒级响应。
2、面向金融(Finance Oriented):具有跟踪和审计功能的高可用性。
3、行业可持续性(Industry Sustainable):支持万亿级的消息容量。
4、厂商中立(Vendor Neutral):自最新4.1版本以来使用新的开放分布式消息和流媒体标准。
5、大数据友好(BigData Friendly):高吞吐量的通用集成批量传输。
6、大规模堆积(Massive Accumulation):只要有足够的磁盘空间,就能在无性能损失的情况下持续堆积。
高可用分布式系统存在“三高”:高可用、高性能、高并发。其中Slave复制机制主要解决的是“三高”问题中的高可用部分。
我们都知道,小概率事件最终一定是会发生的,所以如果我们使用的是单台机器,终有一天出现单点故障的问题——提供服务的机器在故障期间无法提供服务。我们希望当出现单点故障的时候,我们能够有一台机器能替代故障机器提供服务,这就要求这台机器有故障机器在故障前的所有状态和数据,而本文提到的Rocket MQ 采用的Slave复制就是复制Master机器的状态和数据的一种策略。
技术设计 概览在RocketMQ中,消息的复制分为两种模式:同步复制和异步复制。其中同步复制是指在生产消息时,消息存储到本机后立刻复制到Slave机器中,当复制 *** 作完成之后,Broker才会返回「生产成功」的响应。
整个消息复制的设计中,RocketMQ使用的异步复制、选择性同步等待的模式。在消息复制的过程,涉及到好几个比较重要的模块:
1、复制请求双队列:该模块用于存储复制请求,供检查线程进行偏移量检查和同步复制状态。
2、写线程:主要根据Slave最新的消息偏移量进行消息日志的同步,涉及到很多偏移量计算的逻辑。
3、读线程:主要是接收Slave上报的确认偏移量,用于确定后续复制的最新的偏移量。
4、HA客户端:Slave与Master交互的主要组件,包含着日志消息的接收和消息接收偏移量的上报。
5、Commit Log:本地存储的消息的组件。
在进行生产消息时,Broker会先将消息写入本地的文件存储中,然后会提交一个同步请求,有一个循环的消费者线程会不断检查Slave复制的最新的偏移量,只有当最新的复制偏移量达到当前写入的偏移量时,才会响应「成功」。
技术细节 双队列优化优化的目标:降低消费者的消费时延
单队列的问题
生产者消费者问题分为4种情况:
1、Multi-Producer Multi-Consumer:MPMC,多生产者多消费者;
2、Multi-Producer Single-Consumer:MPSC,多生产者单消费者;
3、Single-Producer Multi-Consumer:SPMC,单生产者多消费者;
4、Single-Producer Single-Consumer:SPSC,单生产者单消费者;
情况一:MPMC的情况
这种场景下多个生产者和多个消费者竞争一个队列,可以考虑两边进行锁分离,这种一般是通过双端队列来实现的——生产者往队列尾部追加请求,消费者在队列头部消费请求,可以参考AQS的队列实现或者相关论文:链接: https://pan.baidu.com/s/1y-RshPQ97Ja8AgK_PflN8w 提取码: 9bc2。
通过双端队列进行优化,生产者和消费者基本不用进行锁的争抢。还可以可以考虑将队列分成多个,多队列进行锁请求的分摊,降低竞争的激烈程度。
同时采用上面两种方法:多队列 + 双端队列,通过优化,降低了消费端和生产端的延迟,将MPMC的情况分解成了多个MPSC的模型。
情况二:MPSC的情况
这种场景下多个生产者和单个消费者竞争一个队列,这会极大的降低消费者的效率,这种场景可以将队列分成两个,让消费者端完全无锁。
情况三:SPMC 的情况
可以考虑双端队列进行优化,生产者无锁。
这种场景下单个生产者和多个消费者竞争一个队列,这种场景可以将队列分成两个,让生产者端完全无锁。
更极端的情况是每个消费者都有一个队列,生产者这边控制往每个队列生产消息,这样能做到基本无锁。
情况四:SPSC 的情况
这种场景下单个生产者和单个消费者竞争一个队列,这种场景可以考虑使用双端队列,让两端基本无锁。
总而言之,优化手段主要有两个:
1、通过双端队列,消除生产者和消费者之间的锁竞争;
2、通过多队列,消除生产者与生产者之间,消费者与消费者、生产者消费者之间的锁竞争;
如果生产者生产快的话,选用的模型一般是SPMC、MPMC,我们的重点是尽量消除消费者之间、生产者和消费者之间的锁竞争,此时我们要采用双端队列或者多队列消除大量生产者对消费者的影响。为了提升消费者消费速率,我们可以使用多队列进行一个消费者的锁的负载均衡,降低对单独的锁的竞争的激烈程度。如果锁优化还是无法让生产者和消费者速率匹配,可以通过增加消费者来实现。
如果生产者生产慢于消费者,选用的模型一般是MPSC或者MPMC,我们的重点是消除生产者之间、消费者和生产者之间的所竞争,此时我们要采用双端队列或者多队列消除大量生产者对单一消费者的影响。为了提高生产者的效率,我们还可以使用多队列进行生产者端锁竞争的负载均衡,减少对单独锁的竞争的激烈程度。如果锁优化还是无法让生产者和消费者速率匹配,可以通过增加生产者来实现。
RocketMQ使用的是MPSC模型,使用了双队列,重点是消除了大量生产者对单一消费者的影响。
双队列的优化思路是消除消费者端的锁竞争,保证单一消费者的效率。
是否有机会降低生产者的生产时延?因为在本地持久化的时候会有加锁的 *** 作,所以我感觉可以理解成当到达提交复制请求的阶段的时候,锁的竞争是相对比较少的,所以生产者的在锁方面的损耗是相对比较少的,并非瓶颈。
如果真的需要降低的话,可以通过增加写队列的形式进行降低,相当于进行一个负载均衡,降低锁竞争的激烈程度,但这会引入一定的复杂性。
复制模型同步复制:在写入本地存储的时候,调用远程API写入Slave。
同步写单次只能同步当前的生产的消息,会导致每条消息都会产生一次网络交互,每条消息还要一一确认导致网络IO的增加,而且消息到达的顺序无法保证。
异步复制:在写入本地存储后,等待Slave将消息同步过去。
如果是异步写,等待Slave主动同步消息,有个好处就是可以批量处理消息,Slave可以批量将同一段时间生产的消息同步过去,然后直接一次性确认。
RocketMQ中异步备份 & 同步等待设计
A、Broker在写入本地存储之后,会产生一个同步请求,提交该同步请求后,复制服务会返回一个Future对象,通过复制线程和消息生产线程之间通过Future对象进行状态同步;
B、一个线程会定时检查Slave复制的最新偏移量,如果Slave的偏移啊大于等于当前请求的偏移量,则表示同步完成,可以将同步的结果设置为成功。
RocketMQ在提交异步任务之后,如何知道是否同步是否完成呢?通过CompletableFuture。
生产线程在提交复制请求后,会进行阻塞等待,真实复制是在复制线程完成的,一个固定的线程会不断检查复制偏移量,确定当前生产的消息是否完成复制,最后通过设置CompletableFuture通知生产线程。
单次同步的消息的数量:一条。
Slave复制偏移量的确认:定时上报当前处理偏移量 + 每次读请求上报最大偏移量。
异常模型 状态码 | 响应码映射 | 描述 | 生产者行为 |
PUT_OK | SUCCESS | 生产成功 | SEND_OK |
FLUSH_DISK_TIMEOUT | FLUSH_DISK_TIMEOUT | 刷盘超时 | FLUSH_DISK_TIMEOUT |
FLUSH_SLAVE_TIMEOUT | FLUSH_SLAVE_TIMEOUT | Slave复制超时 | FLUSH_SLAVE_TIMEOUT |
SLAVE_NOT_AVAILABLE | SLAVE_NOT_AVAILABLE | Slave不可用 | SLAVE_NOT_AVAILABLE |
MESSAGE_ILLEGAL | MESSAGE_ILLEGAL | 消息不合法 | 异常 |
CREATE_MAPEDFILE_FAILED | SYSTEM_ERROR | 创建日志文件失败 | 异常 |
PROPERTIES_SIZE_EXCEEDED | MESSAGE_ILLEGAL | 消息不合法 | 异常 |
SERVICE_NOT_AVAILABLE | SERVICE_NOT_AVAILABLE | 服务不可用 | 异常 |
OS_PAGECACHE_BUSY | SYSTEM_ERROR | 如果PageCache刷新时间比较长,则视为busy | 异常 |
LMQ_CONSUME_QUEUE_NUM_EXCEEDED | SYSTEM_ERROR | - | 异常 |
UNKNOWN_ERROR | SYSTEM_ERROR | 未知异常 | 异常 |
推模型:Master主动向Slave发送消息。Slave主动连接Master,Master会对Slave的连接进行存储,供消息推送使用。
拉模型:Slave主动向Master拉取消息。Master无需存储Slave的连接,Slave每次向Master拉取消息。
推模型可以做的优化空间相对比较小,因为每次在生产消息本地持久化之后,就需要向Slave推送,一是无法批量推送,进行网络IO优化;二是无法无法保证顺序,需要而外设计通过单线程异步复制保证顺序。
拉模型能比较好的实现批量消息传输和顺序传输,单是拉模型会比较依赖Slave这边的复制的效率,在Kafka中通过ISR进行优化,同样可以是解决。
当Slave都挂了...1、当处于Slave全部挂了时,Master的消息生产会不会停止?不会,HA不会影响消息生产的过程,也不会影响生产者的行为。
2、当Slave重新启动之后,消息落后将会怎么进行同步?我们可以通过消息日志文件,得出现在复制的最大偏移量,通过上报该偏移量,Master会主动推送该偏移量后续生产的消息。
3、新加入一个Slave,因为没有数据,将如何进行同步?从当前日志文件中最早的消息开始进行同步。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)