Kafka KStreams-处理超时

Kafka KStreams-处理超时,第1张

Kafka KStreams-处理超时

一些澄清:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
    提交间隔的下限,即,在提交之后,下次提交不会在此时间之前进行。基本上,Kafka Stream会在这段时间过后尝试尽快提交,但是并不能保证执行下一次提交实际上需要多长时间。
  • StreamsConfig.POLL_MS_CONFIG
    用于内部
    KafkaConsumer#poll()
    呼叫,以指定呼叫的最大阻塞时间
    poll()

因此,这两个值都不会更频繁地帮助心跳。

Kafka
Streams在处理记录时遵循“深度优先”策略。这意味着,在

poll()
每个记录之后,将执行拓扑的所有运算符。假设您有三个连续的映射,则在处理下一个/第二个记录之前,将为第一条记录调用所有三个映射。

这样,在完全处理

poll()
完第一个记录的所有记录之后,将进行下一个调用
poll()
。如果您想更频繁地发送心跳信号,则需要确保单个
poll()
调用可获取较少的记录,从而处理所有记录所需的时间更少,并且下一个记录
poll()
将更早触发。

您可以使用配置参数

KafkaConsumer
来指定通过它
StreamsConfig
来完成此 *** 作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX,VALUE);

  • max.poll.records
    :如果减小此值,将轮询较少的记录
  • session.timeout.ms
    :如果增加此值,则会有更多的时间来处理数据(为了完整性起见,添加此时间是因为它实际上是客户端设置,而不是服务器/经纪人端配置-即使您知道此解决方案并且不喜欢它: ))

编辑

从Kafka开始

0.10.1
,可以(并建议)在stream
config中为consumer和procuder配置添加前缀。这避免了参数冲突,因为某些参数名称用于使用者和生产者,并且不能以其他方式区分(并且将同时应用于使用者

生产者)。要给参数加上前缀,可以分别使用
StreamsConfig#consumerPrefix()
StreamsConfig#producerPrefix()
。例如:
streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER),VALUE);

要添加的另一件事:这个问题中描述的方案是一个已知问题,并且已经有KIP-62引入了用于

KafkaConsumer
发送心跳的后台线程,从而将心跳与
poll()
呼叫分离。Kafka
Streams将在即将发布的版本中利用此新功能。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5429371.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存