| :-- | :-- | :-- |
| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
| git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
| git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:
环境和版本本次实战的环境和版本如下:
-
JDK:1.8.0_211
-
Flink:1.9.2
-
Maven:3.6.0
-
*** 作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
-
IDEA:2018.3.5 (Ultimate Edition)
- 在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):
mvn
archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.9.2
- 现在maven工程已生成,用IDEA导入这个工程,如下图:
- 以maven的类型导入:
- 导入成功的样子:
- 项目创建成功,可以开始写代码实战了;
实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction
@Override
public void flatMap(String s, Collector
if(StringUtils.isNullOrWhitespaceonly(s)) {
System.out.println(“invalid line”);
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2
}
}
}
准备完毕,可以开始实战了,先从最简单的Socket开始。
Socket DataSourceSocket DataSource的功能是监听指定IP的指定端口,读取网络数据;
- 在刚才新建的工程中创建一个类Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听本地9999端口,读取字符串
DataStream socketDataStream = env.socketTextStream(“localhost”, 9999);
//每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute(“API DataSource demo : socket”);
}
}
从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;
- 在IDEA上运行Socket类,启动成功后再回到刚才执行nc -lk 9999的控制台,输入一些字符串再回车,可见Socket的功能已经生效:
- 基于集合的DataSource,API如下图所示:
- 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//通过generateSequence得到Long类型的DataSource
DataStream dataStream = env.generateSequence(1, 10);
//做一次过滤,只保留偶数,然后打印
dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute(“API DataSource demo : collection”);
}
}
- 运行时会打印偶数:
- fromElements和fromCollection就在一个类中试了吧,创建FromCollection类,里面是这两个API的用法:
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//创建一个List,里面有两个Tuple2元素
List
list.add(new Tuple2(“aaa”, 1));
list.add(new Tuple2(“bbb”, 1));
//通过List创建DataStream
DataStream
//通过多个Tuple2元素创建DataStream
DataStream
new Tuple2(“ccc”, 1),
new Tuple2(“ddd”, 1),
new Tuple2(“aaa”, 1)
);
//通过union将两个DataStream合成一个
DataStream
//统计每个单词的数量
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute(“API DataSource demo : collection”);
}
}
- 运行结果如下:
- 下面的ReadTextFil
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
e类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)