【FLINK】The requested state does not exist. Check for typos in your state descriptor, or specify the

【FLINK】The requested state does not exist. Check for typos in your state descriptor, or specify the,第1张

【FLINK】The requested state does not exist. Check for typos in your state descriptor, or specify the

背景:Flink 1.13 ,Scala 2.11

在使用Flink Broadcast State Pattern的时候,发现启动的时候的出现异常

1、报错信息
Caused by: java.lang.IllegalArgumentException: The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.
	at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator$ReadWriteContextImpl.getBroadcastState(CoBroadcastWithNonKeyedOperator.java:168)
	at com.flink.demo.broadcast.transform.CoBroadcastProcessFunction.processBroadcastElement(CoBroadcastProcessFunction.java:53)
	at com.flink.demo.broadcast.transform.CoBroadcastProcessFunction.processBroadcastElement(CoBroadcastProcessFunction.java:20)
	at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement2(CoBroadcastWithNonKeyedOperator.java:118)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create(StreamTwoInputProcessorFactory.java:190)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUponFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
2、分析及解决  2.1 查看报错信息源码

结论:发现是在取广播状态的时候报的异常,再继续往上看是哪里调用了getBroadcastState

2.2 查看调用方法,及定义的MapStateDescriptor

 在BroadcastProcessFunction里定义了一个Descriptor


 

2.3 思考

在这里留下了疑惑,我明明有定义一个Descriptor,为什么会提示说找不到呢,是不是我定义的有什么问题呀,还是说要跟前面调用广播流定义的Descriptor一模一样?


带着这样子的疑惑我返回去看看了,前面生成广播流时定义的Descriptor,发现有两个地方不一样,一个是生成的一个参数是configBroadcastDescriptor和configDescriptor,另一个是MapStateDescriptor里面的参数那么不一样,一个是mysql-config-table另一个是config,那既然发现了不一样的地方那我们来一个个修正。

 

2.3.1 修改参数名字

统一改成configBroadcastDescriptor

结论:发现仍然还是启动报相同的错误,与这个无关

2.3.2 修改MapStateDescroptor中的name

哦豁~,发现程序正常启动了,奶思~

3、 反思

3.1 首先对广播的Descriptor有了更深入的理解,dataStream.broadcast(MapStateDescriptor)需要跟BroadcastProcessFunction中的Descriptor保持一致

3.2 在参数名的时候应该发现其实这最后指引的是两个寻址,因为本身就是两个新的单独标量

3.3 正常情况下并不需要再生成一个MapState,这里跟CoProcessFunction里的还是不一样的,谨慎的别把他们混在一起

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705461.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存