Flink基于Java的WordCount,根据滑动窗口动态排序实现

Flink基于Java的WordCount,根据滑动窗口动态排序实现,第1张

Flink基于Java的WordCount,根据滑动窗口动态排序实现

背景
刚学习Flink没多久,之前用Spark做WordCount实现排序很简单,因为Spark做的WC是简单的批处理,直接排序就完事了,但是Flink的流处理需要考虑到状态(Stateful),并且时间语义我选择的是ProcessingTime,走了几次坑之后终于实现。

需求
使用Flink统计Kafka的数据,需要按照滑动窗口统计最近窗口5min的数据,每1min输出一次结果。

技术选型:Java,Kafka,Flink

实现
首先需要新建一个Maven项目,pom文件如下:



    4.0.0

    org.example
    fule
    1.0-SNAPSHOT

    
        8
        8
    

    

        
            org.apache.flink
            flink-java
            1.10.1
        

        
            org.apache.flink
            flink-streaming-java_2.12
            1.10.1
        

        
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        

        
            com.alibaba
            fastjson
            1.2.70
        
    


Java代码

package com.jd.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


public class WordCount {

    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(60000);

        // kafka配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("test", new SimpleStringSchema(), properties));

        // 使用flatmap将生数据转成元组型,如(spark,1),(kafka,1)
        DataStream> flatMap = dataStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        });

        // 水位线,因为使用processingTime,所以watermark没用到
//        flatMap.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(2)) {
//            @Override
//            public long extractTimestamp(Tuple2 element) {
//                return getCurrentWatermark().getTimestamp();
//            }
//        });

        // 按单词就行分组
        KeyedStream, Tuple> keyBy = flatMap.keyBy(0);

        // 建立滑动窗口,窗口大小5min,每1min滑动一次
        WindowedStream, Tuple, TimeWindow> window = keyBy.window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1)));

        // 统计单词的词频,并转成Word对象(考虑到后边要用窗口时间触发onTimer,所以将数据转成了对象)
        DataStream apply = window.apply(new WindowFunction, Word, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector out) throws Exception {
                long end = window.getEnd();
                int count = 0;
                String word = input.iterator().next().f0;
                for (Tuple2 tuple2 : input) {
                    count += tuple2.f1;
                }
                out.collect(Word.of(word, end, count));
            }
        });

        // 按照windowEnd属性分组
        KeyedStream windowEnd = apply.keyBy("windowEnd");

        // 注意此步使用 KeyedProcessFunction 而不是 ProcessFunction ,因为 State 和 Timers 只能 keyedStream 触发
        SingleOutputStreamOperator> process = windowEnd.process(new KeyedProcessFunction>() {

            private transient ValueState> valueState;

            // 设置State
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor> VSDescriptor = new ValueStateDescriptor<>("list-state1",
                        TypeInformation.of(new TypeHint>() {
                        })
                );
                valueState = getRuntimeContext().getState(VSDescriptor);
            }

            // 处理State,按条件触发ontimer
            @Override
            public void processElement(Word value, Context ctx, Collector> out) throws Exception {
                List buffer = valueState.value();
                if (buffer == null) {
                    buffer = new ArrayList<>();
                }
                buffer.add(value);
                valueState.update(buffer);
                // 触发条件:滑动窗口的窗口结束时间+1
                // 比如,窗口时间是1000-2000,数据的windowEnd是2000,
                // 如果下一条进来的windowEnd是3000,就会触发1000-2000窗口的onTimer
                ctx.timerService().registerProcessingTimeTimer(value.getWindowEnd() + 1);
            }

            // 触发,对valueState中的数据按词频从大到小排序且输出
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception {
                List value = valueState.value();
                value.sort((a, b) -> (int) (b.getCount() - a.getCount()));
                valueState.clear();
                out.collect(value);
            }
        });

        // 打印出来
        process.print();
        // 执行任务,不执行不会做任何 *** 作
        env.execute("WordCount");
    }

}

package com.jd.wordcount;



public class Word {

    private String word;
    private long windowEnd;
    private int count;

    public static Word of(String word, long windowEnd,int count) {
        Word word1 = new Word();
        word1.word = word;
        word1.windowEnd = windowEnd;
        word1.count = count;
        return word1;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public long getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(long windowEnd) {
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "Word{" +
                "word='" + word + ''' +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}

执行结果

①前三条是0-1min发送的
②第四条是2-3min发送的
③第五条是5-6min发送的
flink任务每一分钟执行一次统计前五分钟的数据结果如下

说明:
第一条输出统计的①
第二条输出统计的①
第三四五输出统计的①②
第六条输出统计的②③

希望可以帮助你

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676135.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存