上一篇博客中,我们提到了如何去通过StreamAPI访问Flink CDC的结果,但是往往很多时候,访问出来的结果都很难被下游直接使用,还需要做很多String类型的数据处理,这里就单独开一篇博客来讲讲如何,在读的时候,一次性做到数据根据我们的需要去展示,也就是展示的更加简洁优雅一点。
不跟你多BB,直接上源代码,有疑问的可以评论区交流,感谢。
import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class CustomizeDeserialization implements DebeziumDeserializationSchema{ private Struct structValue; private JSonObject getTableColumns(String type) { Struct beforeOrAfterFields = structValue.getStruct(type); JSonObject beforeOrAfterJson = new JSonObject(); if (beforeOrAfterFields != null) { Schema beforeOrAfterJsonSchema = beforeOrAfterFields.schema(); List beforeOrAfterListFields = beforeOrAfterJsonSchema.fields(); for (Field beforeOrAfterColumnField : beforeOrAfterListFields) { Object fieldValue = beforeOrAfterFields.get(beforeOrAfterColumnField); beforeOrAfterJson.put(beforeOrAfterColumnField.name(), fieldValue); } } return beforeOrAfterJson; } @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { //1.创建一个 JSonObject 对象,用于存储cdc里数据转换成的JsonString数据 JSonObject resultJson = new JSonObject(); //2.获取库名&表名 String topic = sourceRecord.topic(); String[] informations = topic.split("\."); String database = informations[1]; String tableName = informations[2]; structValue = (Struct) sourceRecord.value(); //4.获取"before"&&"after"里的数据 getTableColumns("before"); getTableColumns("after"); //5.获取 *** 作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); //6.收集数据到 collector里 resultJson.put("database",database); resultJson.put("tableName",tableName); resultJson.put("before",getTableColumns("before")); resultJson.put("after",getTableColumns("after")); resultJson.put("crudActionType",operation); collector.collect(resultJson.toJSonString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
如下是用到的pom文件
1.12.0 2.11.12 2.11 org.apache.flink flink-java${flink.version} log4j *org.slf4j slf4j-log4j12org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} log4j *org.slf4j slf4j-log4j12com.google.code.findbugs jsr305org.apache.flink force-shadingorg.apache.flink flink-scala_${scala.binary.version}${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version}${flink.version} log4j *org.slf4j slf4j-log4j12org.apache.flink flink-clients_${scala.binary.version}${flink.version} com.alibaba.ververica flink-connector-debezium1.3.0 com.alibaba fastjson1.2.62
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)