为什么在实时计算中定义UDF函数中open方法会执行多次?

为什么在实时计算中定义UDF函数中open方法会执行多次?,第1张

为什么在实时计算中定义UDF函数中open方法会执行多次?

实时UDF中open方法执行次数
      • 描述背景:
      • 执行报错:
      • 问题排查:
        • 排查:
        • 疑惑:
        • 解决:
      • 引申:
      • 总结:

描述背景:

在使用blink进行开发IP匹配的时候,因为是通过UDF来实现的,所以打算在UDF的open方法中预先读取IP信息数据,【open方法对于一个实例Task只调用执行一次,算作是预处理,类似hive的UDF中setup方法】存放在集合中。然后处理集合,按照有序排序,最终使用二分查找去寻找当前IP对应的地址信息。

具体实现和背景介绍在之前的这篇文章:链接—>Hi一起来了解实时数据的IP解析吧

执行报错:

显示在集合排序的时候空指针

2021-12-23 15:00:24
java.lang.NullPointerException
	at java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320)
	at java.util.ComparableTimSort.sort(ComparableTimSort.java:202)
	at java.util.Arrays.sort(Arrays.java:1246)
	at java.util.Arrays.sort(Arrays.java:1433)
	at java.util.List.sort(List.java:478)
	at java.util.Collections.sort(Collections.java:141)
	at com.streamCompute.udx.IP.readOss.GetIpInfoFromDisk.open(GetIpInfoFromDisk.java:86)
	at StreamExecCalcRule5.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:475)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:345)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
	at java.lang.Thread.run(Thread.java:834)
问题排查: 排查:
  1. 在代码中加入了防止空逻辑,观察open方法中是否存在空值存入集合
    现象:加入了非空判断,但是依旧报错排序空指针
  2. 在代码中加入了打印逻辑,观察open加载数据流程
    现象:发现在一个taskManager中open反复执行了四次
疑惑:
  1. open方法应该是独立的,一个实例只调用一次,这里为什么会调用四次?
  2. 调用四次为什么会导致集合数据出现空的,就算调用四次应该也会在open方法中初始化不同的四个集合
解决:

1、open方法调用四次,因为是blink中默认配置,一个TaskManager默认由4个slot组成,也就是说,其实一个实例调用一次open说法有误,应该是一个slot调用一次open方法,slot是blink中最小独立计算运行的计算单元

2、代码中将集合变量定义成了static,这个本身是类变量,但是由于4个slot都运行在一个机器上,同时也是一个JVM实例中,只是将资源划分为了4个slot来计算。由于一个JVM中static是共享变量,所以导致4个slot在并行 *** 作这个static的集合,才会导致出现空值。【在写入数据的时候不是空值,但是写入之后,其他slot将这个集合某个位置引用修改了,所以导致最后发现集合里存在空值】

3、最后解决办法也很简单,就是不要将该集合变量定义成static即可,定义成普通变量即可。这样就不会共享了。


引申:

为什么在离线平台统计的时候,可以将集合定义成static呢?链接—>Hi一起来了解下大数据平台的IP解析吧

–因为离线中一个实例下并没有slot的说法,也就是虽然这里定义了static类存在多线程一起 *** 作的隐患,但是实际中,离线的setup方法只有一个线程在 *** 作,【可以暂时理解成离线计算中,一个具体的实例就是一个slot】,这样就不存在将集合数据打乱成空值的情况了。

总结:

虽然是同样的逻辑代码,但是因为处理架构体系的不同,还是需要分别进行特别处理。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683243.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存