Flink cdc是目前实时同步binlog数据非常方便的工具,今天业务在消费同步的数据发现其中有很多的null数据,经过排查,总结出以下特点:
- null数据的出现有一定规律,几乎每个+U(upsert_after)类型的数据前就有一条null数据
- 数据在出现null前,同步逻辑做过更改,添加了where语句过滤数据,在此前没有过null数据
首先,我需要确定null数据是什么。查询官方文档可以知道在upsert-kafka中,会把-D(delete)类型数据写成null。
但是我已经通过参数去掉了-D类型数据,并且-D类型不可能这么有规律的出现,因此,这些null数据很大可能是-U类型数据,但是-U类型为什么会变成null呢?通过源码我们可以一探究竟:原来-U写入kafka会被直接设置成-D类型,最终成为一条null的消息体。
static void processLastRowonChangelog( RowData currentRow, boolean generateUpdateBefore, ValueStatestate, Collector out) throws Exception { RowData preRow = state.value(); RowKind currentKind = currentRow.getRowKind(); if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) { if (preRow == null) { // the first row, send INSERT message currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { if (generateUpdateBefore) { preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } // normalize row kind currentRow.setRowKind(RowKind.INSERT); // save to state state.update(currentRow); } else { // DELETE or UPDATER_BEFORE if (preRow != null) { // always set to DELETE because this row has been removed // even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it. preRow.setRowKind(RowKind.DELETE); // output the preRow instead of currentRow, // because preRow always contains the full content. // currentRow may only contain key parts (e.g. Kafka tombstone records). out.collect(preRow); // clear state as the row has been removed state.clear(); } // nothing to do if removing a non-existed row } }
基于以上可以做一个结论:null数据就是-U(upsert_befor)类型数据,并且是添加where条件后引入的。通过测试也可以验证在没有where条件时,写入kafka的数据并不会出现null。那么为什么同样的-U数据,只会在有where语句时才会出现在kafka中呢。最终通过查看执行计划一切真相大白!
上面的图是不加where语句时flink-cdc的执行计划,和加where语句执行计划的区别就是多了301-DorpUpdateBefore这个算子。原来flink-cdc在upsert-kafka模式下,执行计划在没有where语句时会进行优化,去掉所有的-U数据,这就合理解释了where条件导致的数据差异!
最后,解决kafka中null消息,只需要在flink sql中过滤掉binlog数据类型是-U的数据即可!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)