KafkaConsumer中的位移提交

KafkaConsumer中的位移提交,第1张

KafkaConsumer中的位移提交

kafka--- consumer 消费消息
  • 目录
    • 概 述
  • 小结
  • 参考资料和推荐阅读

LD is tigger forever,CG are not brothers forever, throw the pot and shine forever.
Modesty is not false, solid is not naive, treacherous but not deceitful, stay with good people, and stay away from poor people.
talk is cheap, show others the code and KPI, Keep progress,make a better result.
Survive during the day and develop at night。

目录 概 述

KafkaConsumer中的位移提交:
说位移提交之前,我们首先简单的回顾一下位移和消费者位移之间的区别。我们通常所说的位移是指 TopicPartition 在 Broker 端的存储偏移量。而消费者位移是指某个消费者组在不同 TopicPartition 上面的消费偏移量。下面我们介绍一下消费者位移的提交方式,其中主要包含了自动提交和手动提交。
自动提交
对于启用自动提交位移,在 KafkaConsumer 实例初始化的时候,通过设置参数 enable.auto.commit 的值为 true 即可(默认为true)。同时与其相关联的参数 auto.commit.interval.ms,这个参数可以设置提交的时间间隔,这个值默认是5秒。
对于自动提交的触发条件,除了要满足时间的阈值,还需要Client端调用 KafkaConsumer.poll() 方法才能触发。每次执行都会调用 ConsumerCoordinator.poll() 执行消费者入组的流程,在方法执行的最后会执行一个异步的 offset 提交。实现代码如下:

手动提交
同步提交时通过 KafkaConsumer.commitSync()方法实现,其内部又调用了ConsumerCoordinator.commitOffsetsSync() 方法发送位移提交请求。同步的位移提交提供了重试的机制,其代码实现如下:

public boolean commitOffsetsSync(Map offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())
return true;

do {
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}

 RequestFuture future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // 如有处理中的位移提交,则等待执行完成
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
     if (interceptors != null)
         interceptors.onCommit(offsets);
     return true;
 }

 if (future.failed() && !future.isRetriable())
     throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);

} while (timer.notExpired());

return false;
}

对于同步提交,官方文档上面提供了一次拉取,按照批次提交位移的方式,这样可以减少重复消费的批量。

对于同步提交,官方文档上面提供了一次拉取,按照批次提交位移的方式,这样可以减少重复消费的批量。

另外一种方式是,可以按照TopicPartition 分区的维度去提交位移。

异步提交
对于同步的位移提交,通常情况下会影响系统的吞吐量。此时KafkaConsumer也提供了异步的提交方式,也就是commitAsync()。但是相对同步的位移提交,此时异步提交缺少了重试的机制,同步的重试机制可以在网络抖动的场景下,减少提交失败的场景。异步提交在这里没有重试机制,是因为重试的时候消费位移可能已经变化,此时提交已经没啥意义了。

在生产上面,还是建议使用异步的位移提交,这样也可以提升客户端的TPS。对于提交的方式,笔者从网上找到了如下的提交方式:

小结 参考资料和推荐阅读

1.链接: 参考资料.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5635004.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存