1 概述
2 相关类介绍
3 同步刷盘原理
4 异步刷盘
RocketMQ和其他存储系统类似,如Redis等,提供了同步和异步两种刷盘方式,同步刷盘方式能够保证数据被写入硬盘,做到真正的持久化,但是也会让系统的写入速度受制于磁盘的IO速度;而异步刷盘方式在将数据写入缓冲之后就返回,提供了系统的IO速度,却存在系统发生故障时未来得及写入硬盘的数据丢失的风险。
RocketMQ提供了 SYNC_FLUSH 和 ASYNC_FLUSH 两种方式,也即同步和异步刷盘方式,同步刷盘在写入消息后会等待刷盘进度大于等于当前写入经度之后返回,而异步刷盘则在写入消息之后直接返回,不再等待刷盘进度。
在阅读本文前可先看文章 RocketMQ源码-MappedFile介绍 ,了解其中介绍的暂存池相关原理以及具体刷盘 *** 作时 commit 和 flush 动作的区别,本文在介绍刷盘时则不再赘述。
其实同步刷盘、异步刷盘和我们在文章 RocketMQ源码-主从同步复制和异步复制 介绍的同步复制、异步复制原理基本相同,同步刷盘也是阻塞等待当前刷盘进度大于等于此次写入进度然后返回,而异步刷盘写入之后直接返回,由后台线程定时进行刷盘动作。
如果配置的刷盘方式为同步方式,即 SYNC_FLUSH ,那么根据我们在文章 RocketMQ源码-MappedFile介绍 第8节介绍的注意事项可知,该配置肯定不会启用 MappedFile 的暂存池 TransientStorePool 功能。而 GroupCommitService 就是用于同步刷盘时进行实际的刷盘动作。
用于没有启用暂存池的异步刷盘动作,主要是定时触发 flush 动作。
用于启用了暂存池的异步刷盘动作,和 FlushRealTimeService 不同的是, CommitRealTimeService 在刷盘时会先将从暂存池借用的 ByteBuffer 中的数据 commit 到 fileChannel 中,然后调用 flush 对 fileChannel 进行刷盘 *** 作。
CommitLogputMessage 在写入消息之后,会调用 handleDiskFlush 进行刷盘相关处理,该方法实现如下:
相关的服务源码比较简单,和主从同步复制及其类似,这里不再介绍,建议阅读文章 RocketMQ源码-主从同步复制和异步复制 做对比理解。
异步刷盘则写入消息之后直接返回,由 ServiceThread 实现类 FlushRealTimeService 以及 CommitRealTimeService 在后台根据配置的刷盘频率进行异步刷盘, FlushRealTimeService 对未启用暂存池的 MappedFile 进行刷盘,而 CommitRealTimeService 则对启用了暂存池的 MappedFile 进行刷盘。
javamq大量数据怎么及时响应给客户端
javamq大量数据怎么及时响应给客户端
1、使用高性能的消息中间件,如Kafka,RabbitMQ等,提高消息传输的效率。
2、使用异步消息技术,采用消息队列的方式,将消息放入队列中,客户端可以立即收到消息,不用等待消息处理完成。
3、使用分布式消息中间件,将消息分发到不同的服务器上,以提高消息处理效率。
4、使用缓存技术,将消息缓存到内存中,以便快速响应客户端请求。
5、使用负载均衡技术,将消息分发到不同的服务器上,以提高消息处理效率。
后端可以生成一个以手机号为key的实体状态信息,存到缓存(设置过期时间)。等待消息返回回调或异常处理并更新状态信息
前端(轮询)调用后端写的一个接口去缓存拿到状态 根据状态码进行提示
一、缘起
MQ消息必达,架构上有两个核心设计点:
(1)消息落地
(2)消息超时、重传、确认
再次回顾消息总线核心架构,它由
发送端、服务端、固化存储、接收端
四大部分组成。
为保证消息的可达性,超时、重传、确认机制可能导致消息总线、或者业务方 收到重复的消息 ,从而对业务产生影响。
举个栗子:
购买会员卡,上游支付系统负责给用户扣款,下游系统负责给用户发卡,通过MQ异步通知。不管是上半场的ACK丢失,导致MQ收到重复的消息,还是下半场ACK丢失,导致购卡系统收到重复的购卡通知,都可能出现,上游扣了一次钱,下游发了多张卡。
消息总线的幂等性设计 至关重要,是本文将要讨论的重点。
二、上半场的幂等性设计
MQ消息发送上半场,即上图中的1-3
1,发送端MQ-client将消息发给服务端MQ-server
2,服务端MQ-server将消息落地
3,服务端MQ-server回ACK给发送端MQ-client
如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息, MQ系统内部必须生成一个inner-msg-id ,作为去重和幂等的依据,这个内部消息ID的特性是:
(1)全局唯一
(2)MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽
有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。
三、下半场的幂等性设计
MQ消息发送下半场,即上图中的4-6
4,服务端MQ-server将消息发给接收端MQ-client
5,接收端MQ-client回ACK给服务端
6,服务端MQ-server将落地消息删除
需要强调的是,接收端MQ-client回ACK给服务端MQ-server,是消息消费业务方的主动调用行为,不能由MQ-client自动发起,因为MQ系统不知道消费方什么时候真正消费成功。
如果5丢失,服务端MQ-server超时后会重发消息,可能导致MQ-client收到重复的消息。
此时重发是MQ-server发起的,消息的处理是消息消费业务方,消息重发势必导致业务方重复消费(上例中的一次付款,重复发卡),为了保证业务幂等性 ,业务消息体中,必须有一个biz-id ,作为去重和幂等的依据,这个业务ID的特性是:
(1)对于同一个业务场景,全局唯一
(2)由业务消息发送方生成,业务相关,对MQ透明
(3)由业务消息消费方负责判重,以保证幂等
最常见的业务ID有:支付ID,订单ID,帖子ID等。
具体到支付购卡场景,发送方必须将支付ID放到消息体中,消费方必须对同一个支付ID进行判重,保证购卡的幂等。
有了这个业务ID,才能够保证下半场消息消费业务方即使收到重复消息,也只有1条消息被消费,保证了幂等。
三、总结
MQ为了保证消息必达,消息上下半场均可能发送重复消息,如何保证消息的幂等性呢?
上半场
MQ-client生成inner-msg-id,保证上半场幂等。
这个ID全局唯一,业务无关,由MQ保证。
下半场
业务 发送方 带入biz-id,业务 接收方去重 保证幂等。
这个ID对单业务唯一,业务相关,对MQ透明。
结论:幂等性,不仅对MQ有要求,对业务上下游也有要求。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的 中间件 设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
那么,基于 RabbitMQ(消息队列)实现数据异步入库有什么好处呢?
RabbitMQ客户端模块
参考:
RabbitMQ基础概念详细介绍
消息队列
在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。
MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。
我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+NET CORE 31
设计思路为:
内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。
那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。
下面结合代码讲下具体实现:
我们先看看业务队列发送消息时,如何定义
这里会内置上面描述的重试队列需要的参数
再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)
我们再看看PublishRetry重试队列的推送方法如何实现
重试队列的消费者实现
然后在系统中,内置重试队列消费者
消息队列接收到请求后,会将消息顺序写入Physic log文件,对于延迟消息,将消息按照将要投递的时间,以小时为单位异步分割存储,每个小时的消息顺序写入延迟物理文件delay log,并把索引存储在delay index文件,索引记录消息在delay log中的{offset,size,投递时间戳}元信息,基于有限内存以及延迟消息分发特性,我们仅将最近两个小时的delay log文件序列采用mmap内存映射机制进行读写,延迟2个小时以上的消息直接写入磁盘文件。
但消息分发时,每次需要将一个小时的的索引文件,全部加载到内存,由于每个小时的消息索引是顺序写入delay index的,而消息分发投递时间又是随机的,写入顺序与消息投递顺序并不一致,所以,索引加载到内存后,需要按照消息具体投递的秒级时间戳进行排序,再根据排序后的索引读取delay log中的消息进行分发投递。
这种存储方案有以下问题: 1一次需要加载整个小时的消息索引到内存,若并发比较高,内存压力比较大。 2按照消息投递秒级时间戳进行排序后,实时到来的新的消息,需要实时插入排序,性能较低,延迟大。
为了解决上述问题,我们将delay index中索引元信息{offset, size, 投递时间戳}改为{offset, size, localIndex, globalIndex, preGlobalIndex },其中:
通过globalIndex可以直接定位到delay index中的索引单元,从而确定delay log中的一条消息,而preGlobalIndex又可以定位到同一秒内的上一条消息,因此只要落地存储每个小时,每秒最后一条消息的索引ID,即可逆序查出每秒所有消息。一个小时内只有3600秒,只需要将3600个16字节的索引ID加载到内存,即可实现每秒消息的实时加载。
为了降低消息分发延迟,可将最近10s的消息索引提前预加载到内存,对于实时接收到的消息,根据时间戳匹配到对应的秒,更新这一秒最新一条消息的索引globalIndex与逆向索引preGlobalIndex,不需要做排序,消息插入与读取的复杂度都为O(1)。
采用的由数组加链表实现的多级时间轮机制,分别是秒级和小时级,小时级时间轮前移一个槽,对应秒级时间轮旋转一圈,秒级时间轮上一共3600个槽,每个槽的时间跨度最大为1s,时间轮每秒前移一个槽。小时级实践论每个槽时间跨度是1小时,每小时移动一个槽,将后面两个小时的delay log开启内存映射,同时清除两个小时之前delay log文件内存映射。
当我们只有一个2小时5分钟的消息发送时,秒级时间轮需要推动2圈后即小时级时间轮移动2个槽,剩5分钟的延迟,再降级到秒级时间轮。这叫造成了时间轮的空转。
一般会把每个使用到的槽都会放到DelayQueue中,然后根据DelayQueue来 协助时间轮的推进 ,防止空推进的情况。例如,当有延迟500s的任务时,除了挂载到时间轮外,我们还会把其放到DelayQueue中,这样DelayQueue的头结点为延迟500s,如果期间没有小于500s的延迟任务再加进来时,我们只需要等待500s,时间轮推进一次即可。如果有小于500s的定时任务新加进来,我们只需要唤醒DelayQueue,重新计算等待时间即可。
即当有定时任务新增时,如果对应槽为新槽(即新增任务为该槽的第一个任务),在DelayQueue中增加延迟任务,并判断是否为头结点,是的话唤醒DelayQueue重新计算等待时间。
当master发生漂移或者网络异常时,时间轮分发控制需要从原master节点切换到新的master节点。为了保证分发状态的连续性与一致性,master节点定时每隔50ms分别将两个时间轮上分发的tick信息同步到其它slave节点。通过tick可确定具体分发到第几秒,但不能确定分发到这一秒的第几条消息,为此二级时间轮增加同步了一个参数localIndex,记录当前秒分发到第几条消息,并且每个节点都会定时将分发状态持久化。
每当master发生切换时,原master节点切换为slave,会立即停止当前时间轮的分发任务,并清空分发状态;而新的master节点根据当前已同步过来的分发状态初始化两级时间轮,但master切换会有一定的延迟周期或者极端情况下不同节点间时钟存在偏差,新的master初始完时间轮的tick后,该tick对应的秒级时间戳有可能与节点实际时间不一致,启动分发任务前需要做特殊调整,若tick时间戳小于当前时间,则分发任务sleep等待直至时间对齐,若tick时间戳大于当前时间,说明存在已到期的消息未分发,此时连续推进tick迁移,并对到期消息直接异步投递,直到tick对应时间戳小于当前时间。
正常master切换分为两种情况,一种为主动释放master,如节点重启与master负载均衡过程,这种情况节点在drop master之前,会首先同步时间轮分发状态到其它slave节点,此时master切换时间轮分发时完全连续一致的;另一种是一些异常情况下master被动漂移,此时新的master节点上时间轮分发状态可能存在最大50ms的延迟,会出现部分消息重复分发现象。把时间轮分发状态信息封装到到期投递消息协议扩展字段中,paxos请求同步消息时携带时间轮状态,即可做到实时同步。
众所周知,RocketMQ是支持消息过滤的,即发送消息时,可以给消息设置一个TAG。订阅主题的时候,可以设置只消费携带某些TAG的消息,起到消息过滤的作用。
客户端拉取消息时,在服务端得到tag的hash集合codeSet,然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer。
因为Hash存在冲突,过滤不完全准确,所以,客户端收到消息后,会进行再次精准过滤。
还有一种过滤方式,把TAG通过哈希转换为long,索引中保存所有TAG的哈希值按位或的结果。当拉取消息时,通过订阅设置的TAG哈希值与索引中的哈希值进行按位与 *** 作,如果结果等于订阅设置的TAG哈希值,说明该索引对应的消息可能符合条件,二次精准过滤依旧在客户端来做;否则,一定不符合条件,直接过滤掉。
以上就是关于RocketMQ源码-同步和异步刷盘全部的内容,包括:RocketMQ源码-同步和异步刷盘、javamq大量数据怎么及时响应给客户端、使用RabbitMQ异步调用第三方接口怎么给前端响应问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)