RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:RocketMQ:release-4.8.0。上一篇文章我们分析了RocketMQ的的消费超时/失败重试机制,最终会发送一个延时消息到Broker,本篇接着分析RockeTMQ延时消息的实现机制;
1、消息延时级别消息的延时级别level一共有18级,分别为:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
level有以下三种情况:
level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1slevel > maxLevel,则level== maxLevel,例如level==20,延迟2h 2、定时消息(延迟消息)简介
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic;
- 定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。注意:定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("producer_saint"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("consumer-timeout", "msg-body-001".getBytes(StandardCharsets.UTF_8)); msg.setTags("msg-body-001"); // 设置消息延时级别 msg.setDelayTimeLevel(3); List二、源码分析 1、整体实现流程list = new ArrayList<>(); list.add(msg); SendResult send = producer.send(list); System.out.println("sendResult: " + send); } }
整体流程如下图,executeonTimeup()方法部分代码太多,流程图中采用文字说明;
在Broker启动时会间接执行ScheduleMessageService#start(),执行启动延时消息服务 *** 作;下面我们从Broker的核心类BrokerController中开始看起;
1> BrokerStartup#start()这里是Broker启动的核心;关于Broker的启动流程,请参考这篇文章:RocketMQ:深度剖析Broker启动流程原理、源码
紧接着进入到BrokerController#start();在这里会启动消息持久化服务MessageStore。
2> DefaultMessageStore#start()
MessageStore是一个接口,下面会进入到它的实现类DefaultMessageStore中;在DefaultMessageStore#start()方法中会判断Broker是否开启了DLegerCommitLog,如果没有并且Broker的角色不是Slave,才会开启延时消息服务。
ScheduleMessageService#start()是开启延时消息服务的核心,下面我们接着看;
3、核心逻辑ScheduleMessageService#start()中主要做两件事:
- 为每个延时级别都分别开启一个定时任务,每秒执行一次发送延迟消息到真实Topic的 *** 作;延时10s为每个延时级别都分别开启一个定时任务,每10s做一次延时队列中消息偏移量的持久化;
public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // delayLevelTable中存放着每个延时级别和其对应的消息offset for (Map.Entry1> ScheduleMessageService.this.persist()entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { // 1、每秒从"SCHEDULE_TOPIC_XXXX" topic中取数据 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 2、延时10s启动,并且每10s把每一个延迟队列的最大消息偏移量写入到磁盘中 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
我们先看ScheduleMessageService.this.persist(),点进去会进去到ConfigManager#persist()方法中;这里的 *** 作就单纯的持久化delayQueue的offset到delayOffset.json文件中;
public synchronized void persist() { // 读取offsetTable缓存的延迟队列的值 String jsonString = this.encode(true); if (jsonString != null) { // 读取delayOffset.json的文件地址 String fileName = this.configFilePath(); try { // 持久化到delayOffset.json文件中 MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }2> DeliverDelayedMessageTimerTask
DeliverDelayedMessageTimerTask是TimerTask的子类,表示一个线程任务;其主要作用是扫描延迟消息队列(SCHEDULE_TOPIC_XXXX)的消息,将该延迟消息转换为真实topic的消息。
1)DeliverDelayedMessageTimerTask#run()这里真实消息的topic有几个特殊之处:
对于(并发消费模式下)消费超时重试的消息而言,真实的topic是%RETRY%+consumerGroup;
我们接着看看这个线程任务内部,执行了executeonTimeup()方法;这是判断延时消息是否到应该转发到真实topic的核心逻辑。
由于代码篇幅过长,这里我们从五个问题着手分析;
- 根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么处理?延时消息是否到期?到期后怎么处理?延时消息到期后写入CommitLog失败怎么处理?延时消息没到期怎么处理?延时队列中的消息处理完怎么处理?
先看第一问题:
(1)根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么办?判断当前延时级别对应在SCHEDULE_TOPIC_XXXX主题下的queue是否存在,如果存在进入第二个问题,否则隔0.1s再次开启当前TimerTask;
再来看第二个问题:
在第一问题成立之后,我们已经获取到延时级别对应的延时队列,接下来首先要根据offset从ConsumeQueue中获取到延时消息的部分信息(offset、size、到期时间);接着再判断消息是否到期,并计算出下一个延时消息在延时队列中的offset;
如果消息到期,再从commitlog中根据ofsetPy取出完整的消息,解析出消息的真实Topic和Queue,并清除消息的延时属性,然后将消息写入到CommitLog中;
对于如何判断消息过期,我们再跟一下correctDeliverTimestamp()方法;
从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);
再来看第三个问题:
(3)延时消息到期后写入CommitLog失败怎么办?如果写入CommitLog失败,则延时10s重新开启当前TimerTask,持久化delayOffset;
再来看第四个问题:
(4)延时消息没到期怎么处理?如果消息未到期,则延时countdown(countdown为延时队列中第一个消息的剩余到期时间),开启一个TimerTask,并持久化delayOffset;
最后再看第五个问题:
(5)延时队列中的消息处理完怎么办?遍历完相应延时级别的延时队列后,更新下一次开始读取延迟队列的offset,然后延时0.1s开启当前TimerTask,并持久化delayOffset;最后退出当前方法;
executeonTimeup()方法完成代码如下:
public void executeOnTimeup() { // 根据延迟级别找到topic为SCHEDULE_TOPIC_XXXX的队列(队列的ID 为延时级别 - 1) ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; // 找到延时级别对应的队列 if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 消息的commitLog物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 消息大小 int sizePy = bufferCQ.getByteBuffer().getInt(); // 延迟结束时间,在消息写入到CommitLog之后会分发到consumeQueue; // 对于延迟消息而言,tagsCode存储的是消息的延迟到期时间 long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); // 计算是否到消息投递时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 定时任务下一次开始读取延迟队列的offset nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 剩余的延时时间 long countdown = deliverTimestamp - now; if (countdown <= 0) { // 根据CommitLog物理偏移量找到msg MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { // 解析消息体,取出真实的topic和queue(多为%RETRY% + consumerGroup) MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // 将消息写入到commitLog中 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); // 写入成功,跳过该次循环判断下一条延迟消息是否达到到期时间 if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { log.info("send msg to real topic: {} from schedule topic: {}", msgInner.getTopic() ,msgExt.getTopic()); continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); // 写入消息失败,则延时10s重新执行TimerTask ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { // 这里说明,延时队列中最小到期的那条消息都还没到延迟时间 // 重新提交一个TimerTask,延迟执行时间为延时队列中第一个消息剩余的延时时间 ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); // 更新延时队列已消费的消息偏移量 ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for // 定时任务下一次开始读取延迟队列的offset nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 开始一个延时100ms执行的定时任务 消费延时队列 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); // 将下一次读取延迟队列的offset存放到一个缓存map中 ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
源码分析完了,我们来总结一下实现原理。
三、实现原理消息的发送:
producer设置消息的delayLevel延迟级别,并在消息属性DELAY中存储对应的延时级别;broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始的topic和queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC_XXXX),queueId改为(延时级别-1)。
消息的处理:
四、对延时消息机制的思考MQ服务端(Broker)的ScheduleMessageService中,为每一个延迟级别分别开启一个定时器,定时(每隔1秒)从延迟级别对应的的ConsumeQueue消费队列中拉取消息;然后根据消费偏移量offset从commitLog中解析出对应的消息;从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);若到达了投递时间(消息到期),则构建一个新的消息,从源消息属性中解析出出真实的topic和queueId,并清除消息的延迟属性;将其写入到CommitLog中;
优点:
设计简单,把所有相同延迟时间的消息都先放到一个队列中,做定时扫描,可以保证消息消费的有序性;延时队列中的消息时按消息到期时间进行递增排序,也就是说队列中消息越靠前的到期时间越早;
缺点:
延时消息机制所有的定时任务都在一个定时器中,定时器采用的java.util.Timer,而Timer是单线程运行的;如果延迟消息的数量很大大的话,可能单线程处理不过来,也就会造成消息到期后没有及时发送出去的现象,甚至会造成消息拥堵;
可能的改进点:
为每个延迟队列上分别采用一个Timer;或者说仅使用Timer开始定时任务做扫描,而消息处理的核心逻辑使用线程池处理,进而提高消息处理的效率;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)