阿里云实时计算平台Flink报错汇总

阿里云实时计算平台Flink报错汇总,第1张

一、全托管实时计算平台 报错问题

1.2022.04.14记录

所有cdc模式都不支持窗口,如果加上cdc模式会报错:

org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[vvp, dwd, holo_dwd_0_wangshuaizun_test_source_test]], fields=[id2, id5]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:391) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$visitChild(FlinkChangelogModeInferenceProgram.scala:350) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:339) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun.apply(FlinkChangelogModeInferenceProgram.scala:338) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:338) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:289) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$

2.2022.04.22 报错:java.lang.RuntimeException: [J cannot be cast to [Ljava.lang.Object;
    at 

2022-04-22 14:16:26,624 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Join(joinType=[LeftOuterJoin], where=[(device_id = device_id0)], select=[server_ ... with job vertex id f0454919fe2f8843439f26cd357c7689 (2/3)#0 (963785f0e7e4b77c735d940a8bbbe947) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: [J cannot be cast to [Ljava.lang.Object;
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:120)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:104)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:49)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
	at org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction.finishAccBundle(MiniBatchLocalGroupAggFunction.java:90)
	at org.apache.flink.table.runtime.operators.bundle.bundle.HeapInPlaceBundle.finishBundle(HeapInPlaceBundle.java:58)
	at org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator.finishBundle(AbstractMapBundleOperator.java:144)
	at org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator.processWatermark(AbstractMapBundleOperator.java:151)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
	at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632)
	at org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
	at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632)
	at org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
	at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:640)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:298)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:137)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:685)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:640)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:624)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:799)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
	at java.lang.Thread.run(Thread.java:877)
Caused by: java.lang.ClassCastException: [J cannot be cast to [Ljava.lang.Object;
	at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
	at org.apache.flink.table.runtime.typeutils.ExternalSerializer.serialize(ExternalSerializer.java:162)
	at org.apache.flink.table.dataview.NullAwareMapSerializer.serialize(NullAwareMapSerializer.java:116)
	at org.apache.flink.table.dataview.NullAwareMapSerializer.serialize(NullAwareMapSerializer.java:35)
	at org.apache.flink.table.dataview.MapViewSerializer.serialize(MapViewSerializer.java:93)
	at org.apache.flink.table.dataview.MapViewSerializer.serialize(MapViewSerializer.java:46)
	at org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRawValue(AbstractBinaryWriter.java:150)
	at org.apache.flink.table.data.writer.AbstractBinaryWriter.writeRawValue(AbstractBinaryWriter.java:140)
	at org.apache.flink.table.data.writer.BinaryRowWriter.writeRawValue(BinaryRowWriter.java:27)
	at ToBinary504.innerApply_split345(Unknown Source)
	at ToBinary504.innerApply$(Unknown Source)
	at ToBinary504.apply(Unknown Source)
	at ToBinary504.apply(Unknown Source)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:245)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:144)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:50)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:118)
	... 37 more

【报错原因】在SQL序列化生成JsonPlan的时候没有把集合类型的class name序列化下来,导致反序列的时候,类型信息丢失,本来应该生成ArrayLongArrayConverter,却生成了ArrayObjectArrayConverter,导致运行时报错 【绕行方案】 ● 删除 minibatch 相关参数,清 state 重启 ● 计划 2022-05-16 发布 VVR 4.0.13+VVR 4.1.2 版本 fix

二、问题交流:

1、不停⽌Flink作业的情况下,可以动态修改Flink SQL代码并部署上线? ⽬前没有,需要和客户沟通这部分的需求,Flink CEP在做这个能⼒,另外动态修改并⾏度是不是需 求? 2、flink sql必须重启才能⽣效吗? 和问题1⼀致。 3、打包发布有没有预发这种⽅案?有没有灰度测试的⽅案? 预发和测试上推荐⽤户使⽤SavePoint的⽅式进⾏,后边会规划上线如下功能:⽤户作业A的SP可以 被B使⽤,双跑验证。 4、flink经常开发重启,会影响服务启动过程3分钟左右的时间,有没有什么好的解决⽅案? ⼀⽅⾯我们在社区推动Native SP,这个功能在1.16会ready,云上会在下半年,另⼀⽅⾯我们在优化 作业启停链路的优化 5、flink管理页⾯经常崩溃、启动经常出问题,给我们⼀个推荐的上线时间,响应速度较快的时间区 间。另外就是如果等⼯单,基本都太慢。 这部分应该是通知的问题,基本上在我们的发布时会出现。 6、之前有个任务出现state写⼊太⼤会失败的问题,建议的⽅案是设置ttl(但是不能⼩于统计周 期),增加pod让单个pod的state变⼩,但是如果遇上需要排序的场景单个pod量就是⾮常⼤的情 况,怎么办? 优化⽅案】 对于作业配置Gemini statebackend(系统默认)的情况,有如下四种⽅式进⾏调优 a. 清理state 结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设 置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/ 313172.html#section-ukm-5ky-kdo DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉 b. 对state进⾏压缩 VVR 3.x 版本或VVR4.x版本:可以配置 state.backend.gemini.page.flush.local.compression: Lz4 参数,对本地state进⾏压缩,能降低本地磁盘空间 c. 配置state持久化到DFS(VVP中即为OSS) VVR-4.x版本 (4.0.11及以上)( 推荐 ): 配置 state.backend.gemini.file.cache.type: LIMITED 参数,会校验本地盘的剩余空间,当剩余空间不⾜2GB(默认值)时,会将超⽤空间的 ⽂件evict到远程DFS或者直接写⼊到远程DFS。相当于本地盘是作为⼀个local file cache。如果 想要调整剩余空间的阈值,可以配置 state.backend.gemini. file.cache.preserved-space,默 认值是2GB。 VVR-3.x 版本 (3.0.3 及以上):配置 state.backend.gemini.file.cache.type: LIMITED 参数, 该参数⽣效后,会对每个slot的state占⽤设置18GB(默认值)的state限制,超过 18G数据会 evict到远程DFS,下次读取该⽂件内容的时候,会直接从DFS读取。由于该配置仅针对单个 slot,⽽⽬前单个pod的本次磁盘限制是20GB,所以对于slot数⽬⼤于1的情况下,还需要配置 state.backend.gemini.file.cache.capacity 参数。例如对于⼀个配置成4个slot的TM,如果我们 希望整体state的空间占⽤在15GB,那么就需要将该数值配置成 3750mb (这样四个slot所占的整 体空间是 4 * 3750mb = 15GB,不要配置⾥⾯带⼩数点,会解析出问题) 说明:以上参数在 ⾼级配置 -> 更多 Flink 配置 中配置即可 调⼤并发数,从⽽扩⼤Pod数,减少单个Pod的state⼤⼩ 在 ⾼级配置 中调⼤ Task Managers 对于作业配置rocksdb statebackend的情况,有如下两种⽅式进⾏调优: 清理state 结合业务逻辑,state层⾯配置ttl, 这个是state负责数据清理的,数据进到state之后,经过ttl设 置的时长就⽆效了,配置参数可参考: https://help.aliyun.com/document_detail/ 313172.html#section-ukm-5ky-kdo DataStream作业,⽤户可以⾃⾏在代码中清理state,将不⽤的数据⾃⼰delete掉 调⼤并发数,从⽽扩⼤ Pod 数,减少单个Pod的state⼤⼩ 在 ⾼级配置 中调⼤ Task Managers 数量 7、flink sql⾥⾯cdc模式读取mysql数据,两个cdc源进⾏join的时候,如果在⼀个窗⼝内等不到另⼀ 个的数据会怎么处理?会去全量读mysql数据吗? 如果等不到就为null了吧,不会mysql去读了,回先发null,等能关联上了再回撤null值下发关联上的 值,这个和source是啥没关系,双流join的语义就是这样的。 后续Action: 需要明确后续快速升级路径和服务。 需要对实时特征⼯程进⾏进⼀步交流。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720355.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存