文章目录
- Flink快速应用
- 第 1 节 单词统计案例(批数据)
- 1.1 需求
- 1.2 代码实现
- Java程序
- scala程序
- 第 2 节 单词统计案例(流数据)
- 2.1 需求
- 2.2 代码实现
- scala程序
- java程序
通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)
第 1 节 单词统计案例(批数据) 1.1 需求统计一个文件中各个单词出现的次数,把统计结果输出到文件
步骤:
1、读取数据源
2、处理数据源
a、将读到的数据源文件中的每一行根据空格切分
b、将切分好的每个单词拼接1 c、根据单词聚合(将相同的单词放在一起)
d、累加相同的单词(单词后面的1进行累加)
3、保存处理结果
引入依赖pom.xml
Java程序org.apache.flink flink-java1.11.1 org.apache.flink flink-streaming-java_2.121.11.1 org.apache.flink flink-clients_2.121.11.1 org.apache.flink flink-scala_2.121.11.1 org.apache.flink flink-streaming-scala_2.121.11.1
package com.lagou; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountJavaBatch { public static void main(String[] args) throws Exception { String inputPath="D:\data\input\hello.txt"; String outputPath="D:\data\output"; //获取flink的运行环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); DataSettext = executionEnvironment.readTextFile(inputPath); FlatMapOperator > wordondOnes = text.flatMap(new SplitClz()); //(hello 1) (you 1) (hi 1) (him 1) UnsortedGrouping > groupedWordAndOne = wordOndOnes.groupBy(0); //(hello 1) (hello 1) AggregateOperator > out = groupedWordAndOne.sum(1);//1代表第1个元素 out.writeAsCsv(outputPath, "n", " ").setParallelism(1);//设置并行度 executionEnvironment.execute();//人为调用执行方法 } static class SplitClz implements FlatMapFunction >{ @Override public void flatMap(String s, Collector > collector) throws Exception { String[] s1 = s.split(" "); for (String word:s1) { collector.collect(new Tuple2 (word,1));//发送到下游 } } } }
原文件
输出文件
import org.apache.flink.api.scala._ object WordCountScalaBatch { def main(args: Array[String]): Unit = { val inputPath = "D:\data\input\hello.txt" val outputPath = "D:\data\output" val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val text: DataSet[String] = environment.readTextFile(inputPath) val out: AggregateDataSet[(String, Int)] = text.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) out.writeAsCsv(outputPath,"n", " ").setParallelism(1) environment.execute("scala batch process") } }第 2 节 单词统计案例(流数据) 2.1 需求
Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。
2.2 代码实现 scala程序import org.apache.flink.streaming.api.scala._ object WordCountScalaStream { def main(args: Array[String]): Unit = { //处理流式数据 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //拿到运行环境 val streamData: DataStream[String] = environment.socketTextStream("hadoop102", 7777) val out = streamData.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1) out.print() environment.execute() } }java程序
package com.lagou; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountJavaStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStream = executionEnvironment.socketTextStream("linux121", 7777); SingleOutputStreamOperator > sum = dataStream.flatMap(new FlatMapFunction >() { public void flatMap(String s, Collector > collector) throws Exception { for (String word : s.split(" ")) { collector.collect(new Tuple2 (word, 1)); } } }).keyBy(0).sum(1); sum.print(); executionEnvironment.execute(); } }
运行之前需要勾选
#控制台输入,7777表示端口 nc -lp 7777 #然后输入单词,查看统计
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)