如下图所示,producer发送消息到broker之后,会将消息具体内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数起始offset获取待消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。在这个过程中,consumer提交的offset为本次请求的起始消费位置,即beginOffset;consume Queue中的offset定位了commitLog中具体消息的位置。
consume Queue中每个消息索引信息长度为20bytes,包括8位长度的offset,记录commitLog中消息内容的位移;4位长度的size,记录具体消息内容的长度;8位长度的tagHashCode,记录消息的tag的哈希值(订阅时如果指定tag,会根据HashCode快速查找订阅的消息)
对于consumer的消费请求处理(PullMessageProcessor.processRequest()),除了待消费的消息内容,broker在responseHeader(PullMessageResponseHeader)附带上当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。
minOffset、maxOffset是当前消费队列consumeQueue记录的最小及最大的offset信息。nextBeginOffset是consumer下次拉取消息的offset信息,即consumer对该consumeQueue的消费进度。
其中nextBeginOffset是consumer在下一轮消息拉取时offset的重要依据,无论当次拉取的消息消费是否正常,nextBeginOffset都不会回滚,这是因为rocketMQ对消费异常的消息的处理是将消息重新发回broker端的重试队列(会为每个topic创建一个重试队列,以%RERTY%开头),达到重试时间后将消息投递到重试队列中进行消费重试。对消费异常的处理不是通过offset回滚,这使得客户端简化了offset的管理。
所以其实offset就是对消费进度的一个管理单元,不纠结于一点,再继续读代码。
看下这两个方法,一个是更新进度,一个是计算发送时间戳
private void updateOffset(int delayLevel, long offset) { this.offsetTable.put(delayLevel, offset); } public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { Long time = this.delayLevelTable.get(delayLevel); if (time != null) { return time + storeTimestamp; } return storeTimestamp + 1000; }
对于计算发送时间戳的逻辑,如果是有延迟等级的,也就是延时消费的消息,则存储的时间戳加延时时间;
如果是立即消费的,则存储的时间戳加1000ms。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)