目录
Flink 处理数据的流程:
环境准备
目的:
批处理
流处理
Flink 处理数据的流程:
1、获取执行环境; 2、加载/创建初始数据; 3、指定数据相关的转换; 4、指定计算结果的存储位置; 5、触发程序执行(流处理)。环境准备
① 添加依赖
org.apache.flink flink-java1.10.1 org.apache.flink flink-streaming-java_2.121.10.1
② 准备数据源
为了方便测试,在 resources 下创建一个文本作为数据源。
// hello.txt hello flink hello java hello scala hello world hello man i am yuyu目的:
分别使用 批处理、流处理 API 计算输入数据源中每个单词的数量 。
批处理public class WordCount { public static void main(String[] args) throws Exception { // 1、创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据(加载、创建初始数据) String inputPath = "src\main\resources\hello.txt"; DataSourceinputDataSet = env.readTextFile(inputPath); // 3、对数据集进行处理,按空格分词展开,转换成 (word, 1) 二元组进行统计(指定数据相关转换) DataSet > resultSet = inputDataSet.flatMap(new MyflatMapper()) // 按照上面转换的(二元组)的第一个位置的 word 分组 .groupBy(0) // 将第二个位置上的数据求和 .sum(1); // 4、指定计算结果的存储位置 resultSet.print(); } // 自定义类,实现 FlatMapFunction 接口 public static class MyflatMapper implements FlatMapFunction >{ @Override public void flatMap(String value, Collector > out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有分词 word ,包成二元组,并存进collector里 for (String word: words){ out.collect(new Tuple2 (word,1)); } } } }
输出结果:
流处理public class StreamWordCount { public static void main(String[] args) throws Exception { // 1、创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、从文件中读取数据(加载、创建初始数据)【方便测试都从文件读取数据】 String inputPath = "src\main\resources\hello.txt"; DataStreamSourceinputDataStream = env.readTextFile(inputPath); // 基于流处理进行转换计算 // 3、对数据进行处理,按空格分词展开,转换成 (word, 1) 二元组进行统计(指定数据相关转换) DataStream > resultStream= inputDataStream.flatMap(new MyflatMapper()) // 批处理里面用 groupby 分组,流处理里面用 keyby 分组 // 按照上面转换的(二元组)的第一个位置的 word 分组 .keyBy(0) // 将第二个位置上的数据求和 .sum(1); // 4、指定计算结果的存储位置 resultStream.print(); // 5、触发程序执行 env.execute(); } // 自定义类,实现 FlatMapFunction 接口 public static class MyflatMapper implements FlatMapFunction >{ @Override public void flatMap(String value, Collector > out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有分词 word ,包成二元组,并存进collector里 for (String word: words){ out.collect(new Tuple2 (word,1)); } } } }
输出结果:
输出解释:
(1)因为是流处理,所以每收集一个就计算一个,所以 hello 输出从 1 次到 5 次;
(2)输出部分:
前面的数字 3> 可以理解为分布式计数的并行度
在本地机器上相当于可执行的并行度,默认是机器的核数,我的电脑是4核的,所以最大是4
学习来源:尚硅谷Java版Flink(武老师清华硕士,原IBM-CDL负责人)_哔哩哔哩_bilibili
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)