背景
刚学习Flink没多久,之前用Spark做WordCount实现排序很简单,因为Spark做的WC是简单的批处理,直接排序就完事了,但是Flink的流处理需要考虑到状态(Stateful),并且时间语义我选择的是ProcessingTime,走了几次坑之后终于实现。
需求
使用Flink统计Kafka的数据,需要按照滑动窗口统计最近窗口5min的数据,每1min输出一次结果。
技术选型:Java,Kafka,Flink
实现
首先需要新建一个Maven项目,pom文件如下:
4.0.0 org.example fule1.0-SNAPSHOT 8 8 org.apache.flink flink-java1.10.1 org.apache.flink flink-streaming-java_2.121.10.1 org.apache.flink flink-connector-kafka-0.11_2.121.10.1 com.alibaba fastjson1.2.70
Java代码
package com.jd.wordcount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class WordCount { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000); // kafka配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamdataStream = env.addSource(new FlinkKafkaConsumer011 ("test", new SimpleStringSchema(), properties)); // 使用flatmap将生数据转成元组型,如(spark,1),(kafka,1) DataStream > flatMap = dataStream.flatMap(new FlatMapFunction >() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }); // 水位线,因为使用processingTime,所以watermark没用到 // flatMap.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor >(Time.seconds(2)) { // @Override // public long extractTimestamp(Tuple2 element) { // return getCurrentWatermark().getTimestamp(); // } // }); // 按单词就行分组 KeyedStream , Tuple> keyBy = flatMap.keyBy(0); // 建立滑动窗口,窗口大小5min,每1min滑动一次 WindowedStream , Tuple, TimeWindow> window = keyBy.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1))); // 统计单词的词频,并转成Word对象(考虑到后边要用窗口时间触发onTimer,所以将数据转成了对象) DataStream apply = window.apply(new WindowFunction , Word, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable > input, Collector out) throws Exception { long end = window.getEnd(); int count = 0; String word = input.iterator().next().f0; for (Tuple2 tuple2 : input) { count += tuple2.f1; } out.collect(Word.of(word, end, count)); } }); // 按照windowEnd属性分组 KeyedStream windowEnd = apply.keyBy("windowEnd"); // 注意此步使用 KeyedProcessFunction 而不是 ProcessFunction ,因为 State 和 Timers 只能 keyedStream 触发 SingleOutputStreamOperator > process = windowEnd.process(new KeyedProcessFunction
>() { private transient ValueState > valueState; // 设置State @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor
> VSDescriptor = new ValueStateDescriptor<>("list-state1", TypeInformation.of(new TypeHint
>() { }) ); valueState = getRuntimeContext().getState(VSDescriptor); } // 处理State,按条件触发ontimer @Override public void processElement(Word value, Context ctx, Collector
> out) throws Exception { List
buffer = valueState.value(); if (buffer == null) { buffer = new ArrayList<>(); } buffer.add(value); valueState.update(buffer); // 触发条件:滑动窗口的窗口结束时间+1 // 比如,窗口时间是1000-2000,数据的windowEnd是2000, // 如果下一条进来的windowEnd是3000,就会触发1000-2000窗口的onTimer ctx.timerService().registerProcessingTimeTimer(value.getWindowEnd() + 1); } // 触发,对valueState中的数据按词频从大到小排序且输出 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector > out) throws Exception { List
value = valueState.value(); value.sort((a, b) -> (int) (b.getCount() - a.getCount())); valueState.clear(); out.collect(value); } }); // 打印出来 process.print(); // 执行任务,不执行不会做任何 *** 作 env.execute("WordCount"); } }
package com.jd.wordcount; public class Word { private String word; private long windowEnd; private int count; public static Word of(String word, long windowEnd,int count) { Word word1 = new Word(); word1.word = word; word1.windowEnd = windowEnd; word1.count = count; return word1; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public long getWindowEnd() { return windowEnd; } public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; } @Override public String toString() { return "Word{" + "word='" + word + ''' + ", windowEnd=" + windowEnd + ", count=" + count + '}'; } }
执行结果
①前三条是0-1min发送的
②第四条是2-3min发送的
③第五条是5-6min发送的
flink任务每一分钟执行一次统计前五分钟的数据结果如下
说明:
第一条输出统计的①
第二条输出统计的①
第三四五输出统计的①②
第六条输出统计的②③
希望可以帮助你
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)