如何让Flink CDC的结果输出的更优雅一点?

如何让Flink CDC的结果输出的更优雅一点?,第1张

如何让Flink CDC的结果输出的更优雅一点?

上一篇博客中,我们提到了如何去通过StreamAPI访问Flink CDC的结果,但是往往很多时候,访问出来的结果都很难被下游直接使用,还需要做很多String类型数据处理,这里就单独开一篇博客来讲讲如何,在读的时候,一次性做到数据根据我们的需要去展示,也就是展示的更加简洁优雅一点。

不跟你多BB,直接上源代码,有疑问的可以评论区交流,感谢。

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomizeDeserialization implements DebeziumDeserializationSchema {

    private Struct structValue;

    private JSonObject getTableColumns(String type) {

        Struct beforeOrAfterFields = structValue.getStruct(type);
        JSonObject beforeOrAfterJson = new JSonObject();

        if (beforeOrAfterFields != null) {
            Schema beforeOrAfterJsonSchema = beforeOrAfterFields.schema();
            List beforeOrAfterListFields = beforeOrAfterJsonSchema.fields();

            for (Field beforeOrAfterColumnField : beforeOrAfterListFields) {
                Object fieldValue = beforeOrAfterFields.get(beforeOrAfterColumnField);
                beforeOrAfterJson.put(beforeOrAfterColumnField.name(), fieldValue);
            }
        }


        return beforeOrAfterJson;
    }

    

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        //1.创建一个 JSonObject 对象,用于存储cdc里数据转换成的JsonString数据
        JSonObject resultJson = new JSonObject();

        //2.获取库名&表名

        String topic = sourceRecord.topic();

        String[] informations = topic.split("\.");
        String database = informations[1];
        String tableName = informations[2];

        structValue = (Struct) sourceRecord.value();

        //4.获取"before"&&"after"里的数据
        getTableColumns("before");
        getTableColumns("after");

        //5.获取 *** 作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);

        //6.收集数据到 collector里
        resultJson.put("database",database);
        resultJson.put("tableName",tableName);
        resultJson.put("before",getTableColumns("before"));
        resultJson.put("after",getTableColumns("after"));
        resultJson.put("crudActionType",operation);

        collector.collect(resultJson.toJSonString());

    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

如下是用到的pom文件


    1.12.0
    2.11.12
    2.11




    
        org.apache.flink
        flink-java
        ${flink.version}
        
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
        
    

    
        org.apache.flink
        flink-streaming-java_${scala.binary.version}
        ${flink.version}
        
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
            
                com.google.code.findbugs
                jsr305
            
            
                org.apache.flink
                force-shading
            
        
    

    
        org.apache.flink
        flink-scala_${scala.binary.version}
        ${flink.version}
    

    
        org.apache.flink
        flink-streaming-scala_${scala.binary.version}
        ${flink.version}
        
            
                log4j
                *
            
            
                org.slf4j
                slf4j-log4j12
            
        
    

    
        org.apache.flink
        flink-clients_${scala.binary.version}
        ${flink.version}
        
    

    
        com.alibaba.ververica
        flink-connector-debezium
        1.3.0
    

    
        com.alibaba
        fastjson
        1.2.62
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5694333.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存