Flink实战——每隔5秒,统计最近10秒的窗口数据

Flink实战——每隔5秒,统计最近10秒的窗口数据,第1张

Flink实战——每隔5秒,统计最近10秒的窗口数据

 Flink程序的基本构建块是流和转换(请注意,Flink的DataSet API中使用的DataSet也是内部流 )。

1、实时需求 每隔5秒,统计最近10秒的窗口数据 2、开发环境部署 1. 官网建议使用 IDEA , IDEA 默认集成了 Java 和 Maven ,使用起来方便 2. 本次使用了 Flink-1.12 版本 3 、实时代码开发
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //DataSource *** 作
        DataStreamSource sourceStream = env.socketTextStream("192.168.153.10", 6666);

        //通过匿名内部类的方式实现flatMap算子
        final SingleOutputStreamOperator> flatMapStream = sourceStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String line, Collector> collector) throws Exception {
                final String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });
        //keyBy分组 *** 作
        final KeyedStream, String> keyedStream = flatMapStream.keyBy(value -> value.f0);
        //每隔5秒,统计最近10秒的窗口数据
        WindowedStream, String, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        //sum求和 *** 作
        final SingleOutputStreamOperator> result = window.sum(1);

        //输出结果
        result.print();

        //执行程序
        env.execute("StreamWordCount");
    }
}

 实时代码开发需连接集群,具体集群搭建方式参考Flink集群部署

4、离线需求 对文件中的单词内容进行统计计数 5、离线代码开发
class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //创建数据源
        final DataSource source = env.fromElements("Flink flink sqoop hadoop", "flume hadoop MapReduce flink");
        //Transformation *** 作
        final FlatMapOperator> flatMap = source.flatMap(new FlatMapClass());
        final AggregateOperator> result = flatMap.groupBy(0).sum(1);

        //输出结果
        result.print();
    }

    private static class FlatMapClass implements FlatMapFunction> {
        @Override
        public void flatMap(String line, Collector> collector) throws Exception {
            final String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(new Tuple2(word, 1));
            }
        }
    }
}

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679140.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)