8 8 1.10.1 1.2.17 1.7.7 2.11 org.slf4j slf4j-log4j12${slf4j.version} log4j log4j${log4j.version} org.apache.flink flink-java${flink.version} ${project.build.scope} org.apache.flink flink-streaming-java_${scala.version}${flink.version} ${project.build.scope} org.apache.flink flink-runtime-web_${scala.version}${flink.version} ${project.build.scope} org.apache.flink flink-core1.10.1 org.apache.flink flink-runtime_2.111.10.1 org.apache.flink flink-connector-kafka_2.111.10.1
这里的scala版本选择的是2.11,flink-runtime_2.11在idea开发环境运行时,需要添加此依赖。
2. 添加主功能代码package com.demo; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; public class FlinkWindowAvgKafkaStreaming { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 设置启动检查点!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "flink-group"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("flink-topic", new SimpleStringSchema(), props); consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter()); DataStream > keyedStream = env .addSource(consumer) .flatMap(new MessageSplitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .apply(new WindowFunction , Tuple3 , Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable > input, Collector > out) throws Exception { long sum = 0L; int count = 0; for (Tuple2 record: input) { sum += record.f1; count++; } Tuple2 temp = input.iterator().next(); // 统计数据按三元组形式输出 Tuple3 result = new Tuple3 (temp.f0, sum / count, window.getEnd()); out.collect(result); } }); keyedStream.print("output"); env.execute("Flink-Kafka demo"); } }
从kafka读取数据,对数据进行转换,对转换后的数据先进行分组,然后进行开窗,在窗口范围内计算平均数,并且输出计算的平均数和窗口结束时间。
其中窗口时间为10秒。
3. kafka的模拟消息1643685175905,machine-1,5436289024 1643685176920,machine-1,5422505984 1643685177924,machine-1,5431537664 1643685178935,machine-1,5425504256 1643685179940,machine-1,5430718464 1643685180947,machine-1,5437231104 1643685181960,machine-1,5522214912 1643685182965,machine-1,5745750016 1643685183976,machine-1,5746868224
模拟数据可以手动通过输入kafka消息生产者进行生成,也可以结合java maven写入kafka消息demo 进行生成。
4. 辅助代码(MessageWaterEmitter)package com.demo; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks{ //@Nullable @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { if (lastElement != null && lastElement.contains(",")) { String[] parts = lastElement.split(","); return new Watermark(Long.parseLong(parts[0])); } return null; } @Override public long extractTimestamp(String element, long previousElementTimestamp) { if (element != null && element.contains(",")) { String[] parts = element.split(","); return Long.parseLong(parts[0]); } return 0L; } }
这里定义了时间watermark(水位线)的获取方式。
5. 辅助代码(MessageSplitter)package com.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class MessageSplitter implements FlatMapFunction6. 运行程序,输出结果> { @Override public void flatMap(String value, Collector > out) throws Exception { if (value != null && value.contains(",")) { String[] parts = value.split(","); out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2]))); } } }
可以看出每隔10秒,就有一组窗口平均数输出。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)