Flink cdc写入kafka数据出现null的问题排查与解决

Flink cdc写入kafka数据出现null的问题排查与解决,第1张

Flink cdc写入kafka数据出现null的问题排查与解决

Flink cdc是目前实时同步binlog数据非常方便的工具,今天业务在消费同步的数据发现其中有很多的null数据,经过排查,总结出以下特点:

  1. null数据的出现有一定规律,几乎每个+U(upsert_after)类型的数据前就有一条null数据
  2. 数据在出现null前,同步逻辑做过更改,添加了where语句过滤数据,在此前没有过null数据

首先,我需要确定null数据是什么。查询官方文档可以知道在upsert-kafka中,会把-D(delete)类型数据写成null。


但是我已经通过参数去掉了-D类型数据,并且-D类型不可能这么有规律的出现,因此,这些null数据很大可能是-U类型数据,但是-U类型为什么会变成null呢?通过源码我们可以一探究竟:原来-U写入kafka会被直接设置成-D类型,最终成为一条null的消息体。

static void processLastRowonChangelog(
            RowData currentRow,
            boolean generateUpdateBefore,
            ValueState state,
            Collector out) throws Exception {
        RowData preRow = state.value();
        RowKind currentKind = currentRow.getRowKind();
        if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {
            if (preRow == null) {
                // the first row, send INSERT message
                currentRow.setRowKind(RowKind.INSERT);
                out.collect(currentRow);
            } else {
                if (generateUpdateBefore) {
                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
                    out.collect(preRow);
                }
                currentRow.setRowKind(RowKind.UPDATE_AFTER);
                out.collect(currentRow);
            }
            // normalize row kind
            currentRow.setRowKind(RowKind.INSERT);
            // save to state
            state.update(currentRow);
        } else {
            // DELETE or UPDATER_BEFORE
            if (preRow != null) {
                // always set to DELETE because this row has been removed
                // even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
                preRow.setRowKind(RowKind.DELETE);
                // output the preRow instead of currentRow,
                // because preRow always contains the full content.
                // currentRow may only contain key parts (e.g. Kafka tombstone records).
                out.collect(preRow);
                // clear state as the row has been removed
                state.clear();
            }
            // nothing to do if removing a non-existed row
        }
    }

 

基于以上可以做一个结论:null数据就是-U(upsert_befor)类型数据,并且是添加where条件后引入的。通过测试也可以验证在没有where条件时,写入kafka的数据并不会出现null。那么为什么同样的-U数据,只会在有where语句时才会出现在kafka中呢。最终通过查看执行计划一切真相大白!

上面的图是不加where语句时flink-cdc的执行计划,和加where语句执行计划的区别就是多了301-DorpUpdateBefore这个算子。原来flink-cdc在upsert-kafka模式下,执行计划在没有where语句时会进行优化,去掉所有的-U数据,这就合理解释了where条件导致的数据差异!

最后,解决kafka中null消息,只需要在flink sql中过滤掉binlog数据类型是-U的数据即可!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5574859.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存