Flink 问题排查 - 作业部署失败 现象:作业无法正常提交与启动
可能成因
确认方法
解决措施
程序包依赖与集群依赖存在版本冲突
日志:NoSuchMethodError/
IncompatibleClassChangeError/
ClassCastException
1. 程序包中 Flink/Hadoop 相关依赖设为 provided2. 使用 Maven-Shade-Plugin
3. 调整 classloader.resolve-order
程序包缺少依赖
日志:1.NoClassDefFoundError
2.NoMatchingTableFactory
3.Could not instantiate the executor
程序包中添加 connector 依赖
程序包中添加 planner 依赖
升级 1.11 之后,程序包需要添加 flink-client 依赖
Flink Client 缺少依赖
Client 日志:Could not build the program from jar file.
NoClassDefFoundError: hadoop/jersey
Flink lib 目录添加 flink-hadoop-shade-uber-jar
Flink lib 目录添加 jersey-core-jar
集群资源不足
Client 日志:Deploy took more than 60s
Slot allocation request timed out
扩充集群资源
减少任务并行度
Flink 问题排查 - 作业运行异常 现象:作业突然停止运行且不恢复
可能成因
确认方法
解决措施
Source 算子实现方法不正确
JM 日志:作业结束于 FINISHED (SUCCEEDED) 状态
修改 Source 算子的实现,保持 while true 循环
作业重启次数达到阈值
TM 日志:restart strategy prevented it
1. 找出作业崩溃重启原因
2. 增大 RestartStrategy 阈值或者 yarn.application-attempt 阈值
JVM 内存用量超出 YARN/K8s 阈值
JM 日志:Killing container
通常因为 RocksDB 内存不受控导致,可升级为 Flink 1.11 以上版本
也可能是用户代码分配了直接内存
Flink 问题排查 - 作业处理缓慢 现象:作业输出量较稳定,但是不及预期值(5000 ~ 20000 条/秒/核)
可能成因
确认方法
解决措施
序列化、反序列化开销大
指标:CPU 使用率很高
采样:Kryo 等方法占比很高
1. 减少不必要的 rebalance
2. 换用更高效的 序列化框架
算法时间复杂度高(正则 hashCode)
指标:CPU 使用率很高
采样:用户自定义方法占比高
优化自定义方法的逻辑实现
增加作业并行度
数据倾斜
指标:某算子的不同子任务, 输入、输出指标相差很大
key 打散、rebalance、预聚合
低速外部系统
指标:CPU 利用率低
采样:外部 IO 耗时长
1. 批量存取
2. 本地缓存
3. Async I/O 异步交互
Flink 问题排查 - 作业处理缓慢 现象:作业输出量逐步减少,甚至完全无输出
可能成因
确认方法
解决措施
算子背压较高
指标:Flink UI 背压采样 显示红色(HIGH)
可根据 背压分析表 判断瓶颈算子,对其进行调优
Full GC 时间长
指标:GC 时间增长迅速
日志:GC 日志中 Full GC 频繁
1. 增加堆内存上限
2. 优化堆内存使用
数据源(Source)输出慢/发生异常
新建作业,只消费数据源,不经其他算子,输出到 Blackhole Sink
如果吞吐量仍然无法上升,则说明数据源有问题
数据目的(Sink)写入慢/发生异常
使用 Datagen Source,直接输出到数据目的
如果吞吐量仍然无法上升,则说明数据目的有问题
数据格式异常、逻辑错误
日志:存在大量数据异常报错,导致数据被丢掉或者无限重试
调整逻辑,过滤异常数据
快照太频繁
单个快照过大
堆内存中状态过多
指标:Flink UI 查看各个快照的大小以及完成时间。
指标:查看每个 TaskManager 的堆内存用量。
1. 如果快照较大,或完成时间较长,考虑减少快照频率,增大超时时间。
2. snapshotState 方法减少同步调用
3. 启用增量快照或非对齐检查点。
4. 如果用到窗口,可以减少窗口大小,增加 Sliding 窗口的移动周期。
5. 如果用到 GROUP BY,设置 Idle State Retention Time.
6. 自定义状态,设置 State TTL.
Watermark 因异常数据错乱
指标:算子的 Watermark 值远大于当前时间戳。
找出并过滤时间戳明显异常的数据(例如超过当前时间戳太久的数据)
Flink 问题排查 - 作业数据异常 现象:少量数据丢失
可能成因
确认方法
解决措施
逻辑(编程)错误
日志:观察日志中是否有异常。
采样:关闭 Operator Chaining,然后逐个算子观察指标、采样。
修复逻辑问题,从 Savepoint 重新运行作业
个别数据格式异常,造成整批次被丢弃
同上
针对异常数据做容错处理,或暂时关闭批量处理功能。
数据源的元数据改变
检查是否在运行期间改变了数据源的分区数、表定义等元数据
开启运行时分区自动发现
避免在运行期间修改数据源
数据目的不接纳某些数据
日志:检查数据目的(Sink)的相关报错和异常
放宽数据库表的限制。
过滤或补全异常数据。
Flink 问题排查 - 作业数据异常 现象:大量数据重新消费
可能成因
确认方法
解决措施
从错误的 Checkpoint/Savepoint 开始消费
日志:观察日志中 Kafka 等 Source 的 offset 是否回退
指标:观察数据 Lag 是否飙升
选择正确的 Savepoint,
重新运行作业
数据源发生异常
日志:观察日志中 Kafka 等 Source 是否存在异常
指标:观察 Kafka 各分区 Lag 是否接近
手动指定 offset,重新消费
Flink 作业性能调优 – 任务资源
性能瓶颈
评判指标
解决措施
任务并行度
指标:存在数据 Lag;Operator Subtask 之间数据量差距大
设置合适的的任务并行度,一般与 Source 的分区数相同设置优先级:算子>env>命令行>配置文件
内存
指标:1. 存在数据 Lag
2. 观察 JM/TM Metrics - Memory & GC 存在异常
增大内存
手动指定内存各区域比例
CPU
指标:机器 CPU 使用率接近 100%;存在热点方法消耗大量 CPU 时间
JStack/JProfiler 找到 CPU 消耗高的方法,进一步处理
网络
指标:Network Memory
Netty Shuffle Buffers
1.加大 Network Buffer2.开启 Operator Chain,减少数据 shuffle
3.使用大带宽网卡
Flink 作业性能调优 – 数据倾斜 优化点 – 数据去重
优化方案
优点
可能缺陷
Flink State + HashMap
实现简单
Hash 冲突导致性能下降
状态变大导致 GC 和 Checkpoint 问题
Bit Map/Roaring Bit Map
精准去重
内存占用较少
扩容比较困难
Bloom Filter
内存占用少
近似去重
Flink 作业性能调优 – 低速外部系统 优化点 – 维表 Join
Flink 作业性能调优 – 大状态 优化清单
优化点
注意事项
相关配置
Checkpoint 设置
合理设置 Checkpoint 间隔、停顿时间与超时时间
CheckpointInterval
MinPauseBetweenCheckpoints
State TTL + 批量存取
设置 State TTL,防止状态无限增长
State 批量存取,减少与状态后端交互
详见 StateTtlConfig
StateBackend 调优
超大状态情况下选择 RocksDB StateBackend 并妥善调优
开启增量 Checkpointrocksdb localdir 指定多硬盘分担写入压力
详见下页
Unaligned checkpoint
高反压情况下建议使用
State size 可能会有较大增长,导致I/O 增大,作业恢复时间变长
execution.checkpointing.
unaligned
execution.checkpointing.
alignment-timeout
Flink 作业性能调优 – 大状态 RocksDB 参数调优
MemTable 系列参数 Write Buffer Size:控制 MemTable 的阈值。
Write Buffer 越大,写放大效应越小,写性能也会改善。默认大小是 64 MB。可以根据实际情况适当增大。
Write Buffer Count:控制内存中允许保留的 MemTable 最大个数。默认为 2,建议设置到 5 左右。
Block Cache 系列参数
Block Size:增加该配置导致写入性能增强,读取性能下降,需要搭配 Block Cache Size 调整。建议生产环境调整到 16 ~ 32 KB,内存充足可以设为 128 KB。
Block Cache Size:增加 Block 该配置可以明显增加读性能。默认大小为 8 MB,建议设置到 64 ~ 256 MB。
Generic 参数
Max Open Files:决定了 RocksDB 可以打开的最大文件句柄数,默认值是 5000。如果进程的 ulimit 没有限制,建议改为 -1(无限制)。
Index 和 Bloom Filter 系列参数
Cache Index And Filter Blocks:表示是否在内存里缓存索引和过滤器 Block。建议在 Key 具有局部热点时打开。
Optimize Filter For Hits:表示是否会给 L0 生成 Bloom Filter。建议在 Key 具有局部热点时打开。
Flush 和 Compaction 相关参数
Max Bytes For Level base:表示 L1 层大小阈值。该参数太小,每层能存放的 SSTable 较少,导致层级很多,造成查找困难;该参数太大,每层 SSTable 较多,导致执行 Compaction 等 *** 作的耗时较长,此时容易出现 Write Stall(写停止)现象,造成写入中断。默认值为256MB,建议设为 target_file_size 的倍数。
Max Bytes For Level Multiplier:决定了LSM Tree 每层级的大小阈值的倍数关系。根据实际情况进行调整。
Target File Size:表示上一级的 SST 文件达到多大时触发 Compaction *** 作,默认值是 64MB(每增加一级,阈值会自动乘以 target_file_size_multiplier)。 为了减少 Compaction 的频率,生产环境可调整为 128MB 。
Thread Num:表示后台进行 Compaction 和 Flush *** 作的线程数。默认为 1,生产环境建议调大为 4 。
问题追因实践 性能分析 – JProfiler 找出热点方法
问题追因实践 性能分析 – Java Flight Recorder 生成火焰图
坚持学习雀食需要强大的意志,屏蔽外界的干扰,就是学,就是卷才能成为至强者
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)