flink解析canal-json数据

flink解析canal-json数据,第1张

引入依赖

<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.33</version> </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行") val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args)) println(begin_date) //添加kakka数据源 val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps()) .setStartFromEarliest()) //设置消费kafka位置 .map(JSON.parseObject(_)) .filter(_.get("table")=="epidemic_report") .filter(_.get("type").toString.matches("(INSERT|UPDATE)")) .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass)) // .filter(_.getSet_id=="1") .filter(_.getCreat_time > begin_date)

 

flink解析canal-json数据

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/1006992.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-22
下一篇 2022-05-22

发表评论

登录后才能评论

评论列表(0条)

保存