引入依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.33</version> </dependency>val env = StreamExecutionEnvironment.getExecutionEnvironment println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行") val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args)) println(begin_date) //添加kakka数据源 val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps()) .setStartFromEarliest()) //设置消费kafka位置 .map(JSON.parseObject(_)) .filter(_.get("table")=="epidemic_report") .filter(_.get("type").toString.matches("(INSERT|UPDATE)")) .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass)) // .filter(_.getSet_id=="1") .filter(_.getCreat_time > begin_date)
flink解析canal-json数据
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)