Flink
介绍
1:事件驱动(Event-driven)
2:基于流处理
一切皆由流组成,离线数据是有界的流;实时数据是一个没有界限的流。(有界流、无界流)
3:分层API
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活
使用maven创建flink工程。添加依赖。
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.10.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.10.1version>
dependency>
三:wordCount
离线处理:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
// 自定义类,实现FlatMapFunction接口
// 第一个参数是接受的数据类型,第二个参数是输出结果的类型
class ImplFlatMapFunc implements FlatMapFunction<String, Tuple2<String, Integer>> {
/**
* value 接受需要处理的数据
* out 收集器,收集数据,然后输出
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取文件文件数据
String inputPath = "G:\bigData\flink\file\hello.txt";
// 获取数据集
DataSet<String> inputDataSet = env.readTextFile(inputPath);
// groupBy(0) 按照第一个位置的word分组
// sum(1) 按照第二个位置的单词数聚合
DataSet<Tuple2<String, Integer>> result = inputDataSet.flatMap(new ImplFlatMapFunc()).groupBy(0).sum(1);
result.print();
}
}
输出:
(scala,1)
(you,2)
(flink,1)
(world,1)
(hello,4)
(and,1)
(thank,1)
(fine,1)
(spark,1)
流处理环境:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置线程数,默认是电脑的核数。
// env.setParallelism(1);
// 读取文件
String inputPath = "G:\bigData\flink\file\hello.txt";
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 按照keyBy(0) 第一个字段进行分组
// sum(1) 第一个字段进行数据聚合
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
.keyBy(0)
.sum(1);
resultStream.print();
// 开始执行任务
env.execute();
}
}
输出:
默认多线程情况下,流式计算是有状态的计算,会记录每一次的统计结果。8>
表示不同的线程数。
8> (and,1)
5> (fine,1)
5> (you,1)
1> (scala,1)
5> (you,2)
3> (thank,1)
1> (spark,1)
7> (flink,1)
5> (world,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
通过nc实现真正的流式处理
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class NCWordCount {
public static void main(String[] args) throws Exception {
// 设置流式运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置程序并行度
env.setParallelism(1);
// 接受程序运行时参数
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");
// 设置程序数据来源
DataStream<String> inputDataStream = env.socketTextStream(host, port);
// 处理流式数据
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
.keyBy(0)
.sum(1);
// 打印数据
resultStream.print();
// 执行程序
env.execute();
}
}
三:Flink
架构
3.1:运行时组件
3.1.1:JobManager
作业管理器
每个程序都有一个对应的JobManager
,控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager
所控制执行。
JobManager
会先接收到要执行的应用程序,这个应用程序会包括:
作业图(JobGraph)
逻辑数据流图(logical dataflow graph)
打包了所有的类、库和其它资源的JAR
包。
JobManager会把JobGraph
转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph)
,包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)
请求执行任务必要的资源,也就是任务管理器(TaskManager)
上的插槽(slot
)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager
上。
在运行过程中,JobManager
会负责所有需要中央协调的 *** 作,比如说检查点(checkpoints)
的协调。
ResourceManager
资源管理器.
主要负责管理任务管理器(TaskManager)
的插槽(slot)
,TaskManger
插槽是Flink
中定义的处理资源单元。Flink
为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s
以及standalone
部署。当JobManager
申请插槽资源时,ResourceManager
会将有空闲插槽的TaskManager
分配给JobManager
。如果ResourceManager
没有足够的插槽来满足JobManager
的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager
进程的容器。
另外,ResourceManager
还负责终止空闲的TaskManager
,释放计算资源。
TaskManager
任务管理器
TaskManager
的个数 = Job
的最大并行度 /
每个TaskManager
分配的任务槽数。
每个TaskManager
分配的任务槽数可以通过--yarnslots
参数来指定.
一个工作进程会有多个TaskManager
运行,每一个TaskManager
都包含了一定数量的插槽(slots
,又称TasksSlot
)。插槽的数量限制了TaskManager
能够执行的任务数量。启动之后,TaskManager
会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager
就会将一个或者多个插槽提供给JobManager
调用。JobManager
可以向插槽分配任务(tasks
)来执行了。
在执行过程中,一个TaskManager
可以跟其它运行同一应用程序的TaskManager
交换数据.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5TeRMJC7-1650611267401)(flink.assets/image-20210518120117278.png)]
3.1.4:Dispatcher
分发器
可以跨作业运行,它为应用提交提供了``REST``接口。
当一个应用被提交执行时,分发器就会启动并将应用移交给一个``JobManager``。由于是``REST``接口,所以``Dispatcher``可以作为集群的一个``HTTP``接入点,这样就能够不受防火墙阻挡。``Dispatcher``也会启动一个``Web UI``,用来方便地展示和监控作业执行的信息。
``Dispatcher``在架构中可能并不是必需的,这取决于应用提交运行的方式。
3.2:任务提交流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JiEEtant-1650611267403)(flink.assets/image-20210518113604910.png)]
ps:上图中7.指TaskManager为JobManager提供slots,8.表示JobManager提交要在slots中执行的任务给TaskManager。
3.3:yarn运行流程[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tq0ROj66-1650611267404)(flink.assets/image-20210518113709980.png)]
- Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
- 之后客户端向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster
- ApplicationMaster启动后加载Flink的Jar包和配置构建环境,去启动JobManager,之后JobManager向Flink自身的RM进行申请资源,自身的RM向Yarn 的ResourceManager申请资源(因为是yarn模式,所有资源归yarn RM管理)启动TaskManager
- Yarn ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
- NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
-
一个特定算子的子任务
(subtask)
的个数被称之为其并行度(parallelism)
,我们可以对单独的每个算子进行设置并行度,也可以直接用env
设置全局的并行度,更可以在页面中去指定并行度。 -
最后,由于并行度是实际
Task Manager
处理task
的能力,而一般情况下,一个stream
的并行度,可以认为就是其所有算子中最大的并行度,则可以得出在设置slot
时,在所有设置中的最大设置的并行度大小则就是所需要设置的slot
的数量。(如果Slot分组,则需要为每组slot
并行度最大值的和)
4.3.3 程序和数据流(DataFlow)
四:流处理 4.1:创建执行环境[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OcYnKj13-1650611267404)(flink.assets/image-20220103200902904.png)]
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
4.2:Source 读取数据。
4.2.1:从集合中读取数据import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Source_Collection {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据顺序输出
environment.setParallelism(1);
// 从集合中读取数据
DataStream<SensorReading> streamSource = environment.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
// fromElements 读取数据
DataStream<SensorReading> fromElements = environment.fromElements(
new SensorReading("se", 12L, 2.9),
new SensorReading("se_2", 12L, 2.9),
new SensorReading("se_7", 12L, 2.9),
new SensorReading("se_9", 12L, 2.9),
new SensorReading("se_11", 12L, 2.9));
// 设置流的名称
streamSource.print("data");
fromElements.print("int");
// 执行
environment.execute();
}
}
输出:
data> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
data> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
int> SensorReading{temperature=2.9, timestamp=12, id='se'}
data> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_2'}
data> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_9'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_11'}
4.2.3:从文件读取数据
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Source_File {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// 读取
DataStream<String> textFile = environment.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
// 输出
textFile.print("TF");
// 执行
environment.execute();
}
}
4.2.4:从kafka
读取数据
引入kafka依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Source_Kafka {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置kafka连接设置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9002");
// 连接kafka
DataStream<String> textFile = environment.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
// 输出
textFile.print("Kafka");
// 执行
environment.execute();
}
}
4.2.4:自定义数据源
会源源不断的生成新的数据。
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.HashMap;
import java.util.Random;
public class Source_Define {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> dataStream = environment.addSource(new MySensorSource());
dataStream.print();
environment.execute();
}
// 实现自定义的SourceFunction
public static class MySensorSource implements SourceFunction<SensorReading> {
// 标示位,控制数据产生
private volatile boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
//定义一个随机数发生器
Random random = new Random();
// 设置10个传感器的初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; ++i) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for (String sensorId : sensorTempMap.keySet()) {
// 在当前温度基础上随机波动
double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
}
// 控制输出频率
Thread.sleep(2000L);
}
}
@Override
public void cancel() {
this.running = false;
}
}
}
4.3:Transform
转换算子。
4.3.1:基本转换算子 map, flatMap, filter
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Transform_map {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
// 返回每条数据的长度,使用lambda表达式
DataStream<Integer> mapStream = dataStream.map(String::length);
// 切分每条数据
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields = value.split(",");
for (String field : fields) {
out.collect(field);
}
}
});
// 筛选是否以某个单词开头
DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
env.execute();
}
}
输出:
map> 24
flatMap> sensor_1
flatMap> 1547718199
flatMap> 35.8
filter> sensor_1,1547718199,35.8
map> 24
flatMap> sensor_1
flatMap> 1547718299
flatMap> 34.8
filter> sensor_1,1547718299,34.8
map> 24
flatMap> sensor_6
flatMap> 1547718201
flatMap> 15.4
map> 24
flatMap> sensor_6
flatMap> 1547718202
flatMap> 25.4
map> 23
flatMap> sensor_7
flatMap> 1547718202
flatMap> 6.7
map> 24
flatMap> sensor_7
flatMap> 1547718203
flatMap> 16.7
map> 25
flatMap> sensor_10
flatMap> 1547718205
flatMap> 38.1
filter> sensor_10,1547718205,38.1
map> 25
flatMap> sensor_10
flatMap> 1547718206
flatMap> 38.3
filter> sensor_10,1547718206,38.3
4.3.2:聚合算子
DataStream里没有reduce和sum这类聚合 *** 作的方法,因为Flink设计中,所有数据必须先分组才能做聚合 *** 作。先keyBy得到KeyedStream,然后调用其reduce、sum等聚合 *** 作方法。(先分组后聚合).
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_rolling {
public static void main(String[] args) throws Exception {
// 创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取数据
DataStream<String> dataStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 解析文件数据
DataStream<SensorReading> sensorStream = dataStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 根据数据的id进行分组
KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);
// 滚动聚合,max和maxBy区别在于,maxBy除了用于max比较的字段以外,其他字段也会更新成最新的,而max只有比较的字段更新,其他字段不变
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
// 输出
keyedStream.print("keyedStream: ");
resultStream.print("resultStream: ");
// 执行
env.execute();
}
}
keyedStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
keyedStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
keyedStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
keyedStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
resultStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
resultStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
4.3.3:reduce
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
DataStream<SensorReading> sensorStream = dataStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 分组
KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);
// reduce,自定义规约函数,获取max温度的传感器信息以外,时间戳要求更新成最新的
DataStream<SensorReading> resultStream = keyedStream.reduce(
(oldValue, newValue) -> new SensorReading(newValue.getId(), newValue.getTimestamp(), Math.max(oldValue.getTemperature(), newValue.getTemperature()))
);
resultStream.print("result");
env.execute();
}
}
4.3.4:多流转换算子
分流:
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Transform_splitStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 转换成SensorReading
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
// 创建分流标记
OutputTag<SensorReading> high = new OutputTag<SensorReading>("high"){
private static final long serialVersionUID = 1L;
};
OutputTag<SensorReading> low = new OutputTag<SensorReading>("low"){
private static final long serialVersionUID = 1L;
};
// 分流
SingleOutputStreamOperator<SensorReading> processStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx,
Collector<SensorReading> out) {
if (value.getTemperature() > 30) {
ctx.output(high, value);
}else {
ctx.output(low, value);
}
}
});
// 获取侧写流
DataStream<SensorReading> highStream = processStream.getSideOutput(high);
DataStream<SensorReading> lowStream = processStream.getSideOutput(low);
// 输出
highStream.print("high");
lowStream.print("low");
// 执行
env.execute("旁路分流");
}
}
合流:
import com.dzh.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Transform_connect {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStream<String> inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");
// 转换成SensorReading数据流
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 创建分流标记
OutputTag<SensorReading> high = new OutputTag<SensorReading>("high") {
private static final long serialVersionUID = 1L;
};
OutputTag<SensorReading> low = new OutputTag<SensorReading>("low") {
private static final long serialVersionUID = 1L;
};
// 分流
SingleOutputStreamOperator<SensorReading> processStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx,
Collector<SensorReading> out) {
if (value.getTemperature() > 30) {
ctx.output(high, value);
} else {
ctx.output(low, value);
}
}
});
// 获取侧写流
DataStream<SensorReading> highStream = processStream.getSideOutput(high);
DataStream<SensorReading> lowStream = processStream.getSideOutput(low);
highStream.print("high");
lowStream.print("low");
// 合并流
SingleOutputStreamOperator<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> streams = warningStream.connect(lowStream);
SingleOutputStreamOperator<Object> resultStream = streams.map(
new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
}
);
resultStream.print();
env.execute();
}
}
输出:
high> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
high> SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
low> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
low> SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
low> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
low> SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
high> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
high> SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
(sensor_1,35.8,high temp warning)
(sensor_6,normal)
(sensor_1,34.8,high temp warning)
(sensor_6,normal)
(sensor_10,38.1,high temp warning)
(sensor_7,normal)
(sensor_10,38.3,high temp warning)
(sensor_7,normal)
- Connect 的数据类型可以不同,Connect 只能合并两个流;
- Union可以合并多条流,Union的数据结构必须是一样的;
Sink
``Flink``没有类似于``spark``中``foreach``方法,让用户进行迭代的 *** 作。虽有对外的输出 *** 作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出 *** 作。
4.5:Window
streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算 *** 作。
4.5.1:Windown
的类型
时间窗口(Time Window)
滚动时间窗口
滑动时间窗口
会话窗口
计数窗口(Count Window)
滚动计数窗口
滑动计数窗口
TimeWindow:按照时间生成Window。
CountWindow:按照指定的数据条数生成一个Window,与时间无关。
滚动窗口:
依据固定的窗口长度对数据进行切分
时间对齐,窗口长度固定,没有重叠
滑动窗口:
可以按照固定的长度向后滑动固定的距离
滑动窗口由固定的窗口长度和滑动间隔组成
可以有重叠(是否重叠和滑动距离有关系)
滑动窗口是固定窗口的更广义的一种形式,滚动窗口可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)
会话窗口:
由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间无对齐。
Window API
窗口分配器——window()
方法
我们可以用.window()
来定义一个窗口,然后基于这个window去做一些聚合或者其他处理 *** 作。
注意window()方法必须在keyBy之后才能使用。keyBy是一个分组函数。
Flink提供了更加简单的.timeWindow()和.countWindow()方法,用于定义时间窗口和计数窗口。
// windowAll 全部放入同一个分区 不建议使用
/**
* keyBy 之后的分组函数包括
* timeWindow(Time) 调用滚动窗口
* timeWindow(Time, Time) 调用滑动窗口
* window() 自己传入窗口类型
* window(EventTimeSessionWindows.withGap(Time.seconds(10))) 会话窗口
* countWindow(long) 滚动
* countWindow(long, long) 滑动
*/
窗口函数
window function 定义了要对窗口中收集的数据做的计算 *** 作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)
增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction
其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流
Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行 *** 作算子的本地系统时间,与机器相关;
Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
timeWindow(Time, Time) 调用滑动窗口
window() 自己传入窗口类型
window(EventTimeSessionWindows.withGap(Time.seconds(10))) 会话窗口
countWindow(long) 滚动
countWindow(long, long) 滑动
窗口函数
window function 定义了要对窗口中收集的数据做的计算 *** 作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)
增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction
其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流
Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行 *** 作算子的本地系统时间,与机器相关;
Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)