全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十七)延时队列

全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十七)延时队列,第1张

全站最硬核 百万字强肝RocketMq源码 火热更新中~(八十七)延时队列 commitLog与offset

如下图所示,producer发送消息到broker之后,会将消息具体内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数起始offset获取待消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。在这个过程中,consumer提交的offset为本次请求的起始消费位置,即beginOffset;consume Queue中的offset定位了commitLog中具体消息的位置。
consume Queue中每个消息索引信息长度为20bytes,包括8位长度的offset,记录commitLog中消息内容的位移;4位长度的size,记录具体消息内容的长度;8位长度的tagHashCode,记录消息的tag的哈希值(订阅时如果指定tag,会根据HashCode快速查找订阅的消息)

nextBeginOffset

对于consumer的消费请求处理(PullMessageProcessor.processRequest()),除了待消费的消息内容,broker在responseHeader(PullMessageResponseHeader)附带上当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。

minOffset、maxOffset是当前消费队列consumeQueue记录的最小及最大的offset信息。nextBeginOffset是consumer下次拉取消息的offset信息,即consumer对该consumeQueue的消费进度。

其中nextBeginOffset是consumer在下一轮消息拉取时offset的重要依据,无论当次拉取的消息消费是否正常,nextBeginOffset都不会回滚,这是因为rocketMQ对消费异常的消息的处理是将消息重新发回broker端的重试队列(会为每个topic创建一个重试队列,以%RERTY%开头),达到重试时间后将消息投递到重试队列中进行消费重试。对消费异常的处理不是通过offset回滚,这使得客户端简化了offset的管理。

所以其实offset就是对消费进度的一个管理单元,不纠结于一点,再继续读代码。

看下这两个方法,一个是更新进度,一个是计算发送时间戳

private void updateOffset(int delayLevel, long offset) {
    this.offsetTable.put(delayLevel, offset);
}

public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
    Long time = this.delayLevelTable.get(delayLevel);
    if (time != null) {
        return time + storeTimestamp;
    }

    return storeTimestamp + 1000;
}

对于计算发送时间戳的逻辑,如果是有延迟等级的,也就是延时消费的消息,则存储的时间戳加延时时间;

如果是立即消费的,则存储的时间戳加1000ms。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715531.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存