官网: https://flink.apache.org/
1. Flink简介Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink是一个框架和分布式处理引擎,用于对无边界和有边界的数据流进行有状态的计算。Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
1.2 为什么选择Flink流数据 源源不断的数据
目标: 低延迟 高吞吐 准确性 容错性
1.3 Flink特点事件驱动
有界流 使用DataSet
无界流 使用 DataStreamAPI
分层API
支持事件时间和处理时间
精确一次的状态一致性保证
低延迟 每秒百万个事件 毫秒级延迟
高可用
与众多常用存储系统的链接
1.4 Flink VS Spark Streaming流处理 vs 微批处理
- 数据模型运行时架构
可以使用webui界面部署
也可以使用shell命令
#启动命令 /bin/start-cluster.sh #停止 /bin/stop-cluster.sh #提交任务 /bin/flink run -c [指定启动类] -p [并行度] [要提交的jar包地址] [指定jvm参数] #查看当前所有作业 /bin/flink list #取消作业 /bin/flink cancel [jobId]2.2. Yarn
需要hadoop集群
没有安装条件 略
2.3 k8s略
3. Flink 运行架构 3.1 运行时组件 3.1.1 作业管理器JobManager 3.1.2 任务管理器TaskManager 3.1.3 资源管理器ResourceManager 3.1.4 分发器Dispatcher 3.2 任务提交流程 3.3. 任务调度原理 3.4 Slot并行度: 一个特定算子的子任务的个数称为其并行度
一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度
Slots 是指Flink计算中执行一个线程所需要资源(CPU,内存)的最小单元
所以Slot的数量一般设置为TaskManager(JVM)的核心数
Slot 有分组的概念
如果是不同的组,必须使用不同的Slot
3.5 程序与数据流DataFlowFlinke程序分为三大块: Source transform sink
数据传输的形式:
- One-to-one 必须是同共享组,并行度也相同的情况下才会One-to-oneRedistributing 重新分区 *** 作, 当并行度不一样时会进行重新分区轮询 *** 作
流处理过程
Environment => source => transform => sink
4.1 Environment执行环境
//流处理执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 批处理执行环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); //创建本地执行环境 ExecutionEnvironment.createLocalEnvironment([并行度]); //创建远程执行环境 ExecutionEnvironment.createRemoteEnvironment(host,port,jar包地址 );4.2 Source
Flink可以从不同数据源读取数据
4.2.1 从集合和元素中读取数据API executionEnvironment.fromCollection(list);
public static void main(String[] args) throws Exception { // 创建流处理执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度1 executionEnvironment.setParallelism(1); // 创造集合数据 List4.2.2 从文件中读取数据list = new ArrayList<>(); for (int i = 0; i < 5; i++) { list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40))); } // 从集合中收集数据 DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list); // 打印集合数据 sensorReadingDataStreamSource.print("sensor"); // 从元素中收集数据 DataStreamSource integerDataStreamSource = executionEnvironment.fromElements(1, 2, 3, 4, 56, 7); // 打印从元素中收集到数据 integerDataStreamSource.print("element"); // 执行Flink程序 executionEnvironment.execute(); }
API executionEnvironment.readTextFile(inputPath);
public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); String inputPath = "E:\张尧\idea项目\FlinkTutorial\src\main\resources\word.txt"; DataStreamSource4.2.3 从Kafka中读取数据 4.2.3.1 kafka配置dataStreamSource = executionEnvironment.readTextFile(inputPath); SingleOutputStreamOperator > sum = dataStreamSource.flatMap(new WorkCount.MyFlagMapFunction()).keyBy(0).sum(1); sum.print(); executionEnvironment.execute(); }
下载kafka 1.0.0版本以上
需要配置kafka的监听地址(本机除外)
修改config/server.properties
advertised.listeners=PLAINTEXT://192.168.164.205:9092
#启动kafka bin目录下 #启动zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties #启动kafka ./kafka-server-start.sh config/server.properties
package com.zy.flink.source; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaSourceTest { public static void main(String[] args) throws Exception { // 创建kafka连接配置信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.164.205:9092"); // properties.setProperty("group.id", "") // 创建流处理执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); // 从kafka从获取数据 DataStreamSource4.2.4 自定义数据源dataStreamSource = executionEnvironment.addSource(new FlinkKafkaConsumer ("sourcetest", new SimpleStringSchema(), properties)); dataStreamSource.print(); executionEnvironment.execute(); } }
package com.zy.Flink.source; import com.sun.org.apache.xpath.internal.operations.Bool; import com.zy.Flink.entity.SensorReading; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.HashMap; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class UDFSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); DataStreamSource4.3 TransformsensorReadingDataStreamSource = executionEnvironment.addSource(new MySensorSource()); sensorReadingDataStreamSource.print(); executionEnvironment.execute(); } public static class MySensorSource implements SourceFunction { //定义属性控制数据的生成 private Boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { //定义传感器集合 HashMap map = new HashMap<>(); for (int i = 0; i < 10; i++) { map.put("sensor"+i, 60 + ThreadLocalRandom.current().nextGaussian() * 20); } while (running){ for (String s : map.keySet()) { sourceContext.collect(new SensorReading(s, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(),map.get(s)+ThreadLocalRandom.current().nextGaussian())); } Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
转换算子
4.3.1 基本转换算子map flatMap filter 这三个是基本转换算子
package com.zy.Flink.transform; import com.zy.Flink.entity.SensorReading; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; public class TransormTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); // 创造集合数据 List4.3.2 聚合算子list = new ArrayList<>(); for (int i = 0; i < 5; i++) { list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40))); } // 使用集合收集数据 DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list); // map转换 返回sensorReading的sensorId SingleOutputStreamOperator
keyBy 滚动聚合算子(Rolling Aggregation min() max() sum() minBy() maxBy()) Reduce是聚合类的算子
package com.zy.Flink.transform; import com.zy.Flink.TestUtil; import com.zy.Flink.entity.SensorReading; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransormTest3 { public static void main(String[] args) throws Exception { // 创建流执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 executionEnvironment.setParallelism(1); // 从集合收集数据 DataStreamSourcesensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection()); // 根据id keyBy 取最大温度 SingleOutputStreamOperator max = sensorReadingDataStreamSource.keyBy("id").maxBy("tmpperature"); // 输出 max.print(); // 执行 executionEnvironment.execute(); } }
聚合算子 min() 与 minBy() 的区别: min 只有聚合的字段是最小的,其他字段还是第一次收集 到的数据
minBy()是最小的聚合字段对应的数据
4.3.3 Reducepackage com.zy.Flink.transform; import com.zy.Flink.TestUtil; import com.zy.Flink.entity.SensorReading; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransormTest4_Reduce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); DataStreamSource4.3.4 多流转换sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection()); KeyedStream keyedStream = sensorReadingDataStreamSource.keyBy(new KeySelector () { @Override public Object getKey(SensorReading value) throws Exception { return value.getId(); } }); SingleOutputStreamOperator reduce = keyedStream.reduce((value1, value2) -> { return new SensorReading(value1.getId(), System.currentTimeMillis(), Math.max(value1.getTmpperature(), value2.getTmpperature())); }); reduce.print(); executionEnvironment.execute(); } }
split(1.12移除)
connect map 只能合并两条流
union 合并多条流
4.3.5 数据类型Flink支持的数据类型
- 支持Java和scale的所有基本数据类型(包括包装类)Java 和 Scale 元组Scale样例类 ?Java 简单对象 (空参构造)Java 集合 枚举
- 函数类匿名函数富函数
打乱分区顺序,重新分区
4.3.7.2 keyby根据hash计算出分区,相同的key一定在同一分区(同一分区的key不一定相同)
4.3.7.3 global当前的所有流发送到下一分区(同一个分区)
package com.zy.flink.transform; import com.zy.flink.entity.SensorReading; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransfromTest6_partition { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 要测试充分区 *** 作,就不能设置并行度为1 executionEnvironment.setParallelism(4); // 读取文件数据 SingleOutputStreamOperator4.4 Sink 4.4.1 写入kafkadataStreamSource = executionEnvironment.readTextFile("E" + ":\张尧\idea项目\tl\Flink_learn\learn_feature\src\main\resources" + "\sensorReading.txt").map(line -> { String[] split = line.split(","); return new SensorReading(split[0],new Long(split[1]),new Double(split[2])); }); //输出源流 dataStreamSource.print("input"); // shuffle dataStreamSource.shuffle().print("shuffle"); // keyBy dataStreamSource.keyBy("id").print("keyBy"); // global dataStreamSource.global().print("global"); // 执行作业 executionEnvironment.execute(); } }
package com.zy.flink.sink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import java.util.Properties; public class KafkaSink { public static void main(String[] args) throws Exception { // 创建流处理执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); // 从kafka获取消息 // 创建kafka配置信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.164.205:9092"); // 消费消息 DataStreamSourcesourcetest = executionEnvironment.addSource(new FlinkKafkaConsumer ("sourcetest", new SimpleStringSchema(), properties)); // 写入kafka DataStreamSink sinktest = sourcetest.addSink(new FlinkKafkaProducer("192.168.164.205:9092", "sinktest", new SimpleStringSchema())); // 执行作业 executionEnvironment.execute(); } }
需要依赖kafka connector连接器
4.4.2 写入redisorg.apache.flink flink-connector-kafka_${flinks-clients.suffix.version}
2.11 1.0 org.apache.bahir flink-connector-redis_${flinks-clients.suffix.version}${flinks-redis.version}
package com.zy.flink.sink; import com.zy.flink.TestUtil; import com.zy.flink.entity.SensorReading; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class RedisSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource4.4.3 写入es 4.4.4 写入jdbcsensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection()); // 创建jedis配置环境 FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build(); DataStreamSink test = sensorReadingDataStreamSource.addSink(new RedisSink(flinkJedisPoolConfig, new RedisMapper () { // 创建执行方法的描述 @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "test"); } @Override public String getKeyFromData(SensorReading sensorReading) { return sensorReading.getId(); } @Override public String getValueFromData(SensorReading sensorReading) { return sensorReading.getTmpperature().toString(); } })); executionEnvironment.execute(); } }
就是写入数据库
5. window 5.1 window 概念窗口
窗口就是无界流切割为有限流的一种方式
5.2 window 类型- 时间窗口
- 滚动时间窗口 Tumbling Windows
- 时间对齐,窗口长度固定,一个数据只属于一个窗口
- 滑动窗口有步长,一个数据可以存在多个窗口
- 时间无对齐
- 滚动计数窗口滑动计数窗口
window()
timeWindow()
countWindow()
都是开窗 *** 作
5.3.2 window Function当开窗之后,需要使用window Function进行聚合 *** 作
- 增量聚合函数 bucket中只存储一个sum的结果,每来一条数据就计算一次
- ReduceFunctionAggregateFunction…
- ProcessWindowFunctionWindowFunction…
package com.zy.flink.window; import com.zy.flink.entity.SensorReading; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; public class WindowApiTest1_TimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource5.3.3 可选APIsensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection()); SingleOutputStreamOperator dataStreamSource = executionEnvironment.socketTextStream( "192.168.164.205", 8888).map(line -> { String[] splits = line.split(","); return new SensorReading(splits[0], new Long(splits[1]), new Double(splits[2])); }); SingleOutputStreamOperator resultStream = dataStreamSource.keyBy("id") .timeWindow(Time.seconds(10)) .aggregate(new AggregateFunction () { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(SensorReading s, Integer integer) { return integer + 1; } @Override public Integer getResult(Integer integer) { return integer; } @Override public Integer merge(Integer integer, Integer acc1) { return integer + acc1; } }); resultStream.print(); executionEnvironment.execute(); } }
- trigger 触发器
- 定义window的关闭时间,什么时候触发计算输出结果
- 定义移除某些数据的逻辑
- 允许迟到数据
- 将迟到数据放入侧输出流
- 获取测输出流
分为三个时间
- event Time 事件本身的时间,业务数据产生自身的时间Ingestion Time 数据进入Flink的时间Process Time flink *** 作算子进行计算的计算时间
设置eventTime
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);6.3 水位线 watemark
水位线用来处理乱序数据的延迟到达,当数据由于网络或分布式导致到达时间不是顺序时,要使用水位线平衡延迟
6.4 水位线的传递,引入和设定欢迎分享,转载请注明来源:内存溢出
评论列表(0条)