flink sql kafka 解析复杂json

flink sql kafka 解析复杂json,第1张

复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可 *** 作的schema,然后可以通过 field as xx.xx.xx 来使用

version flink 1.13.0

参考

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/json/

https://blog.csdn.net/xianpanjia4616/article/details/112690791

https://blog.csdn.net/YouLoveItY/article/details/108276799

发送json也可以看成字符串处理

We have 2 Options as listed below

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer

Code Reference below

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

public void configure(Map map, boolean b) {

}

public byte[] serialize(String s, Object o) {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream()

ObjectOutputStream oos = new ObjectOutputStream(baos)

oos.writeObject(o)

oos.close()

byte[] b = baos.toByteArray()

return b

} catch (IOException e) {

return new byte[0]

}

}

public void close() {

}

}

And set the value serializer accordingly

<entry key="value.serializer"

value="com.spring.kafka.PayloadSerializer" />

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

Java Object ->String (Preferrably JSON represenation instead of toString)->byteArray


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6422425.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-21
下一篇 2023-03-21

发表评论

登录后才能评论

评论列表(0条)

保存