一些澄清:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
是提交间隔的下限,即,在提交之后,下次提交不会在此时间之前进行。基本上,Kafka Stream会在这段时间过后尝试尽快提交,但是并不能保证执行下一次提交实际上需要多长时间。StreamsConfig.POLL_MS_CONFIG
用于内部KafkaConsumer#poll()
呼叫,以指定呼叫的最大阻塞时间poll()
。
因此,这两个值都不会更频繁地帮助心跳。
Kafka
Streams在处理记录时遵循“深度优先”策略。这意味着,在
poll()每个记录之后,将执行拓扑的所有运算符。假设您有三个连续的映射,则在处理下一个/第二个记录之前,将为第一条记录调用所有三个映射。
这样,在完全处理
poll()完第一个记录的所有记录之后,将进行下一个调用
poll()。如果您想更频繁地发送心跳信号,则需要确保单个
poll()调用可获取较少的记录,从而处理所有记录所需的时间更少,并且下一个记录
poll()将更早触发。
您可以使用配置参数
KafkaConsumer来指定通过它
StreamsConfig来完成此 *** 作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs):
streamConfig.put(ConsumerConfig.XXX,VALUE);
max.poll.records
:如果减小此值,将轮询较少的记录session.timeout.ms
:如果增加此值,则会有更多的时间来处理数据(为了完整性起见,添加此时间是因为它实际上是客户端设置,而不是服务器/经纪人端配置-即使您知道此解决方案并且不喜欢它: ))
编辑
从Kafka开始
0.10.1,可以(并建议)在stream
config中为consumer和procuder配置添加前缀。这避免了参数冲突,因为某些参数名称用于使用者和生产者,并且不能以其他方式区分(并且将同时应用于使用者
和
生产者)。要给参数加上前缀,可以分别使用StreamsConfig#consumerPrefix()或StreamsConfig#producerPrefix()。例如:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER),VALUE);
要添加的另一件事:这个问题中描述的方案是一个已知问题,并且已经有KIP-62引入了用于
KafkaConsumer发送心跳的后台线程,从而将心跳与
poll()呼叫分离。Kafka
Streams将在即将发布的版本中利用此新功能。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)