- 问题背景:dws曝光人+场模型聚合压测,憋量20亿左右数据;问题发生现象:flink job启动后,频繁发生checkpoint失败,并且checkpoint失败原因 :Failure reason: Checkpoint was declined.问题现场日志:
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 8 for operator aggregate -> Sink: exp sink (86/160). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) ... Caused by: org.apache.flink.util.SerializedThrowable: Failed to send data to Kafka: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ... Caused by: org.apache.flink.util.SerializedThrowable: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ... org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
- 问题发生原因描述:
问题的根本原因是kafka消息发送是批量发送,ProducerRecord会先存储到本地buffer,消息存储在这个buffer里的时长是有限制的【request.timeout.ms】,因此在消息量级比较大,存储在buffer里的消息,超过了request.timeout.ms这个设置时长,就会报上述Expiring XXX record(s) for XXX:120015 ms has passed since batch creation错误;而与此同时,我们开启了端到端的精准一次特性即事务,此时checkpoint与消息的pre commit绑定,pre commit 失败,导致checkpoint的失败,任务重启,大量消息积压;问题解决方案:
a)调整 request.timeout.ms 这个参数去满足需求,让消息在buffer里待更长的时间;
b)我们公司会给与每个生产者限速,可以提升生产者的速度,这样本地缓存的消息就不会产生积压;checkpoint失败现场截图,表现为某一个或者多个并行度checkpoint失败:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)