简单批处理、流处理【Flink学习笔记一】

简单批处理、流处理【Flink学习笔记一】,第1张

简单批处理、流处理【Flink学习笔记一】

目录

Flink 处理数据的流程:

环境准备

目的:

批处理

流处理


Flink 处理数据的流程:
1、获取执行环境;
2、加载/创建初始数据;
3、指定数据相关的转换;
4、指定计算结果的存储位置;
5、触发程序执行(流处理)。

环境准备

① 添加依赖

        
        
            org.apache.flink
            flink-java
            1.10.1
        
        
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.10.1
        

② 准备数据源

为了方便测试,在 resources 下创建一个文本作为数据源。

// hello.txt

hello flink
hello java
hello scala
hello world
hello man
i am yuyu

目的:

分别使用 批处理、流处理 API 计算输入数据源中每个单词的数量 。

批处理
public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2、从文件中读取数据(加载、创建初始数据)
        String inputPath = "src\main\resources\hello.txt";
        DataSource inputDataSet = env.readTextFile(inputPath);

        // 3、对数据集进行处理,按空格分词展开,转换成 (word, 1) 二元组进行统计(指定数据相关转换)
        DataSet> resultSet = inputDataSet.flatMap(new MyflatMapper())
                // 按照上面转换的(二元组)的第一个位置的 word 分组
                .groupBy(0)
                // 将第二个位置上的数据求和
                .sum(1);

        // 4、指定计算结果的存储位置
        resultSet.print();
    }

    // 自定义类,实现 FlatMapFunction 接口
    public static class MyflatMapper implements FlatMapFunction>{

        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // 按空格分词
            String[] words = value.split(" ");
            // 遍历所有分词 word ,包成二元组,并存进collector里
            for (String word: words){
                out.collect(new Tuple2(word,1));
            }
        }
    }
}

输出结果:

 

流处理
public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1、创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、从文件中读取数据(加载、创建初始数据)【方便测试都从文件读取数据】
        String inputPath = "src\main\resources\hello.txt";
        DataStreamSource inputDataStream = env.readTextFile(inputPath);

        // 基于流处理进行转换计算
        // 3、对数据进行处理,按空格分词展开,转换成 (word, 1) 二元组进行统计(指定数据相关转换)
        DataStream> resultStream= inputDataStream.flatMap(new MyflatMapper())
                // 批处理里面用 groupby 分组,流处理里面用 keyby 分组
                // 按照上面转换的(二元组)的第一个位置的 word 分组
                .keyBy(0)
                // 将第二个位置上的数据求和
                .sum(1);

        // 4、指定计算结果的存储位置
        resultStream.print();

        // 5、触发程序执行
        env.execute();
    }

    // 自定义类,实现 FlatMapFunction 接口
    public static class MyflatMapper implements FlatMapFunction>{

        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // 按空格分词
            String[] words = value.split(" ");
            // 遍历所有分词 word ,包成二元组,并存进collector里
            for (String word: words){
                out.collect(new Tuple2(word,1));
            }
        }
    }
}

输出结果:

 

输出解释:

(1)因为是流处理,所以每收集一个就计算一个,所以 hello 输出从 1 次到 5 次;

(2)输出部分:

         前面的数字 3> 可以理解为分布式计数的并行度

         在本地机器上相当于可执行的并行度,默认是机器的核数,我的电脑是4核的,所以最大是4

学习来源:尚硅谷Java版Flink(武老师清华硕士,原IBM-CDL负责人)_哔哩哔哩_bilibili

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5682614.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存