flink作为当前最火实时大数据框架,也想阅读其源码,并实战一下它
2、具体步骤 2.1 环境准备jdk1.8+ 即可 ,因为flink 大部分是Java编写的
2.2 创建idea项目和普通的创建maven java项目一样,没有区别
2.3 pom.xml配置
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>org.examplegroupId>
<artifactId>FirstFlinkartifactId>
<version>1.0-SNAPSHOTversion>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<configuration>
<source>8source>
<target>8target>
configuration>
plugin>
plugins>
build>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.11.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.11.1version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>1.11.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_2.12artifactId>
<version>1.11.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.12artifactId>
<version>1.11.1version>
<scope>providedscope>
dependency>
dependencies>
project>
2.4 批程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @Author CaiWencheng
* @Date 2022-05-04 22:52
*/
public class WordCountBatch {
public static void main(String[] args) throws Exception {
// 第一个参数为输入路径,第二个参数为输出路径
String inPath = "E:\IdeaProjects\FirstFlink\data\input\hello.txt";
String outPath = "E:\IdeaProjects\FirstFlink\data\output\output.txt";
// 获取Flink批处理执行环境
ExecutionEnvironment executionEnvironment =
ExecutionEnvironment.getExecutionEnvironment();
// 获取文件中内容
DataSet<String> text = executionEnvironment.readTextFile(inPath);
// 对数据进行处理
DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
dataSet.writeAsCsv(outPath,"\n"," ").setParallelism(1);
// 触发执行程序
executionEnvironment.execute("wordcount batch process");
}
static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>>
collector) throws Exception {
for (String word:line.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word,1));
}
}
}
}
hello.txt
hello flink
hello zk
hello spark
输出结果:output.txt
zk 1
flink 1
hello 3
spark 1
2.5 流程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author CaiWencheng
* @Date 2022-05-04 23:11
*/
public class WordCountStream {
public static void main(String[] args) throws Exception {
// 获取Flink流执行环境
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
// 获取socket输入数据
DataStreamSource<String> textStream =
streamExecutionEnvironment.socketTextStream("hadoop2", 7777, "\n");
SingleOutputStreamOperator<Tuple2<String, Long>>
sum = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>>
collector) throws Exception {
String[] splits = s.split("\s");
for (String word : splits) {
collector.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(0).sum(1);
// 打印数据
sum.print();
// 触发任务执行
streamExecutionEnvironment.execute("wordcount stream process");
}
}
1)在hadoop2机器上,先执行
# 启动7777端口
nc -lp 7777
2)启动 WordCountStream 的main函数
3)在hadoop2 nc命令后,接着输入
hello flink hello flink hello hello
hello spark stream flink hello
4)idea控制台输出
5> (hello,1)
13> (flink,1)
5> (hello,2)
5> (hello,3)
13> (flink,2)
5> (hello,4)
13> (flink,3)
5> (hello,5)
1> (spark,1)
5> (hello,6)
16> (stream,1)
3、总结改进注意事项:第1次执行流程序会报错,勾选如下配置即可
- idea 创建flink java 批/流处理程序,只需要添加对应的依赖即可,
flink-java / flink-streaming-java_2.12
,此外还需要额外加入flink-clients_2.12
- 创建scala版程序和Java版本类似
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)