Flink基础架构及应用

Flink基础架构及应用,第1张

一:Flink介绍

1:事件驱动(Event-driven)
2:基于流处理
一切皆由流组成,离线数据是有界的流;实时数据是一个没有界限的流。(有界流、无界流)
3:分层API
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活

二:环境搭建

​ 使用maven创建flink工程。添加依赖。

        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>1.10.1version>
        dependency>

        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.12artifactId>
            <version>1.10.1version>
        dependency>
三:wordCount

离线处理:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


// 自定义类,实现FlatMapFunction接口
// 第一个参数是接受的数据类型,第二个参数是输出结果的类型
class ImplFlatMapFunc implements FlatMapFunction<String, Tuple2<String, Integer>> {

    /**
     * value 	接受需要处理的数据
     * out   	收集器,收集数据,然后输出
     */
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

public class WordCount {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取文件文件数据
        String inputPath = "G:\bigData\flink\file\hello.txt";
        // 获取数据集
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        // groupBy(0) 按照第一个位置的word分组
        // sum(1) 按照第二个位置的单词数聚合
        DataSet<Tuple2<String, Integer>> result = inputDataSet.flatMap(new ImplFlatMapFunc()).groupBy(0).sum(1);
        result.print();
    }

}

输出:

(scala,1)
(you,2)
(flink,1)
(world,1)
(hello,4)
(and,1)
(thank,1)
(fine,1)
(spark,1)

流处理环境:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        
        // 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置线程数,默认是电脑的核数。
        // env.setParallelism(1);
        
        // 读取文件
        String inputPath = "G:\bigData\flink\file\hello.txt";
        DataStream<String> inputDataStream = env.readTextFile(inputPath);
        
        // 按照keyBy(0) 第一个字段进行分组
        // sum(1) 第一个字段进行数据聚合
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
                .keyBy(0)
                .sum(1);
        
        resultStream.print();
        
        // 开始执行任务
        env.execute();
    }

}

输出:

​ 默认多线程情况下,流式计算是有状态的计算,会记录每一次的统计结果。8>表示不同的线程数。

8> (and,1)
5> (fine,1)
5> (you,1)
1> (scala,1)
5> (you,2)
3> (thank,1)
1> (spark,1)
7> (flink,1)
5> (world,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)

通过nc实现真正的流式处理

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class NCWordCount {

    public static void main(String[] args) throws Exception {
        // 设置流式运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置程序并行度
        env.setParallelism(1);
        // 接受程序运行时参数
        ParameterTool tool = ParameterTool.fromArgs(args);
        String host = tool.get("host");
        int port = tool.getInt("port");
        // 设置程序数据来源
        DataStream<String> inputDataStream = env.socketTextStream(host, port);
        // 处理流式数据
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new ImplFlatMapFunc())
                .keyBy(0)
                .sum(1);
        // 打印数据
        resultStream.print();
        // 执行程序
        env.execute();
    }

}
三:Flink架构 3.1:运行时组件 3.1.1:JobManager

​ 作业管理器

​ 每个程序都有一个对应的JobManager,控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。

JobManager会先接收到要执行的应用程序,这个应用程序会包括:

​ 作业图(JobGraph)
​ 逻辑数据流图(logical dataflow graph)
​ 打包了所有的类、库和其它资源的JAR包。

JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
在运行过程中,JobManager会负责所有需要中央协调的 *** 作,比如说检查点(checkpoints)的协调。

3.1.2:ResourceManager

​ 资源管理器.
​ 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
​ 另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。

3.1.3:TaskManager

​ 任务管理器

TaskManager的个数 = Job的最大并行度 / 每个TaskManager分配的任务槽数。

​ 每个TaskManager分配的任务槽数可以通过--yarnslots参数来指定.

​ 一个工作进程会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots,又称TasksSlot)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager可以向插槽分配任务(tasks)来执行了。

​ 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5TeRMJC7-1650611267401)(flink.assets/image-20210518120117278.png)]

3.1.4:Dispatcher

​ 分发器

可以跨作业运行,它为应用提交提供了``REST``接口。

当一个应用被提交执行时,分发器就会启动并将应用移交给一个``JobManager``。由于是``REST``接口,所以``Dispatcher``可以作为集群的一个``HTTP``接入点,这样就能够不受防火墙阻挡。``Dispatcher``也会启动一个``Web UI``,用来方便地展示和监控作业执行的信息。

``Dispatcher``在架构中可能并不是必需的,这取决于应用提交运行的方式。
3.2:任务提交流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JiEEtant-1650611267403)(flink.assets/image-20210518113604910.png)]

ps:上图中7.指TaskManager为JobManager提供slots,8.表示JobManager提交要在slots中执行的任务给TaskManager。

3.3:yarn运行流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tq0ROj66-1650611267404)(flink.assets/image-20210518113709980.png)]

  1. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
  2. 之后客户端向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster
  3. ApplicationMaster启动后加载Flink的Jar包和配置构建环境,去启动JobManager,之后JobManager向Flink自身的RM进行申请资源,自身的RM向Yarn 的ResourceManager申请资源(因为是yarn模式,所有资源归yarn RM管理)启动TaskManager
  4. Yarn ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
  5. NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
3.4:并行度
  1. 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism),我们可以对单独的每个算子进行设置并行度,也可以直接用env设置全局的并行度,更可以在页面中去指定并行度。

  2. 最后,由于并行度是实际Task Manager处理task的能力,而一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度,则可以得出在设置slot时,在所有设置中的最大设置的并行度大小则就是所需要设置的slot的数量。(如果Slot分组,则需要为每组slot并行度最大值的和)

4.3.3 程序和数据流(DataFlow)

四:流处理 4.1:创建执行环境

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OcYnKj13-1650611267404)(flink.assets/image-20220103200902904.png)]

​ 创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

4.2:Source

​ 读取数据。

4.2.1:从集合中读取数据
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class Source_Collection {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 数据顺序输出
        environment.setParallelism(1);

        // 从集合中读取数据
        DataStream<SensorReading> streamSource = environment.fromCollection(
                Arrays.asList(
                        new SensorReading("sensor_1", 1547718199L, 35.8),
                        new SensorReading("sensor_6", 1547718201L, 15.4),
                        new SensorReading("sensor_7", 1547718202L, 6.7),
                        new SensorReading("sensor_10", 1547718205L, 38.1)
                )
        );

        // fromElements 读取数据
        DataStream<SensorReading> fromElements = environment.fromElements(
                new SensorReading("se", 12L, 2.9),
                new SensorReading("se_2", 12L, 2.9),
                new SensorReading("se_7", 12L, 2.9),
                new SensorReading("se_9", 12L, 2.9),
                new SensorReading("se_11", 12L, 2.9));

        // 设置流的名称
        streamSource.print("data");
        fromElements.print("int");

        // 执行
        environment.execute();
    }

}

输出:

data> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
data> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
int> SensorReading{temperature=2.9, timestamp=12, id='se'}
data> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_2'}
data> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_7'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_9'}
int> SensorReading{temperature=2.9, timestamp=12, id='se_11'}
4.2.3:从文件读取数据
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Source_File {

    public static void main(String[] args) throws Exception {
        
        // 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        
        // 读取
        DataStream<String> textFile = environment.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
        
        // 输出
        textFile.print("TF");
        
        // 执行
        environment.execute();
    }
}
4.2.4:从kafka读取数据

引入kafka依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class Source_Kafka {

    public static void main(String[] args) throws Exception {
        
        // 创建执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 设置kafka连接设置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9002");
        
        // 连接kafka
        DataStream<String> textFile = environment.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
        
        // 输出
        textFile.print("Kafka");
        
        // 执行
        environment.execute();
    }
}
4.2.4:自定义数据源

​ 会源源不断的生成新的数据。

import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Random;

public class Source_Define {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> dataStream = environment.addSource(new MySensorSource());

        dataStream.print();

        environment.execute();
    }

    // 实现自定义的SourceFunction
    public static class MySensorSource implements SourceFunction<SensorReading> {

        // 标示位,控制数据产生
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
            //定义一个随机数发生器
            Random random = new Random();
            // 设置10个传感器的初始温度
            HashMap<String, Double> sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; ++i) {
                sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
            }
            while (running) {
                for (String sensorId : sensorTempMap.keySet()) {
                    // 在当前温度基础上随机波动
                    double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId, newTemp);
                    ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
                }
                // 控制输出频率
                Thread.sleep(2000L);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }
    }
}
4.3:Transform

​ 转换算子。

4.3.1:基本转换算子

map, flatMap, filter

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Transform_map {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");

        // 返回每条数据的长度,使用lambda表达式
        DataStream<Integer> mapStream = dataStream.map(String::length);

        // 切分每条数据
        DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] fields = value.split(",");
                for (String field : fields) {
                    out.collect(field);
                }
            }
        });

        // 筛选是否以某个单词开头
        DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("sensor_1");
            }
        });

        // 打印输出
        mapStream.print("map");
        flatMapStream.print("flatMap");
        filterStream.print("filter");

        env.execute();
    }
}

输出:

map> 24
flatMap> sensor_1
flatMap> 1547718199
flatMap> 35.8
filter> sensor_1,1547718199,35.8
map> 24
flatMap> sensor_1
flatMap> 1547718299
flatMap> 34.8
filter> sensor_1,1547718299,34.8
map> 24
flatMap> sensor_6
flatMap> 1547718201
flatMap> 15.4
map> 24
flatMap> sensor_6
flatMap> 1547718202
flatMap> 25.4
map> 23
flatMap> sensor_7
flatMap> 1547718202
flatMap> 6.7
map> 24
flatMap> sensor_7
flatMap> 1547718203
flatMap> 16.7
map> 25
flatMap> sensor_10
flatMap> 1547718205
flatMap> 38.1
filter> sensor_10,1547718205,38.1
map> 25
flatMap> sensor_10
flatMap> 1547718206
flatMap> 38.3
filter> sensor_10,1547718206,38.3
4.3.2:聚合算子

DataStream里没有reduce和sum这类聚合 *** 作的方法,因为Flink设计中,所有数据必须先分组才能做聚合 *** 作。先keyBy得到KeyedStream,然后调用其reduce、sum等聚合 *** 作方法。(先分组后聚合).

import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_rolling {

    public static void main(String[] args) throws Exception {

        // 创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 获取数据
        DataStream<String> dataStream =  env.readTextFile("G:\bigData\flink\file\sensor.txt");

        // 解析文件数据
        DataStream<SensorReading> sensorStream = dataStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 根据数据的id进行分组
        KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);

        // 滚动聚合,max和maxBy区别在于,maxBy除了用于max比较的字段以外,其他字段也会更新成最新的,而max只有比较的字段更新,其他字段不变
        DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");

        // 输出
        keyedStream.print("keyedStream: ");
        resultStream.print("resultStream: ");

        // 执行
        env.execute();
    }
}
keyedStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
keyedStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
resultStream: > SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
keyedStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
keyedStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
keyedStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
keyedStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
resultStream: > SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
resultStream: > SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
resultStream: > SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
resultStream: > SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
4.3.3:reduce
import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_reduce {

    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> dataStream = env.readTextFile("C:\software\idea\projects\FlinkTutorial\src\main\resources\sensor.txt");
        
        DataStream<SensorReading> sensorStream = dataStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        // 分组
        KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);
        
        // reduce,自定义规约函数,获取max温度的传感器信息以外,时间戳要求更新成最新的
        DataStream<SensorReading> resultStream = keyedStream.reduce(
                (oldValue, newValue) -> new SensorReading(newValue.getId(), newValue.getTimestamp(), Math.max(oldValue.getTemperature(), newValue.getTemperature()))
        );
        
        resultStream.print("result");
        env.execute();
    }
}
4.3.4:多流转换算子

分流:

import com.dzh.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Transform_splitStream {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream<String> inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");

        // 转换成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        } );

        // 创建分流标记
        OutputTag<SensorReading> high = new OutputTag<SensorReading>("high"){
            private static final long serialVersionUID = 1L;
        };
        
        OutputTag<SensorReading> low = new OutputTag<SensorReading>("low"){
            private static final long serialVersionUID = 1L;
        };

        // 分流
        SingleOutputStreamOperator<SensorReading> processStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {

            @Override
            public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx,
                                       Collector<SensorReading> out) {
                if (value.getTemperature() > 30) {
                    ctx.output(high, value);
                }else {
                    ctx.output(low, value);
                }
            }
        });

        // 获取侧写流
        DataStream<SensorReading> highStream = processStream.getSideOutput(high);
        DataStream<SensorReading> lowStream = processStream.getSideOutput(low);
        
        // 输出
        highStream.print("high");
        lowStream.print("low");

        // 执行
        env.execute("旁路分流");
    }
}

合流:

import com.dzh.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Transform_connect {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取文件
        DataStream<String> inputStream = env.readTextFile("G:\bigData\flink\file\sensor.txt");

        // 转换成SensorReading数据流
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 创建分流标记
        OutputTag<SensorReading> high = new OutputTag<SensorReading>("high") {
            private static final long serialVersionUID = 1L;
        };
        OutputTag<SensorReading> low = new OutputTag<SensorReading>("low") {
            private static final long serialVersionUID = 1L;
        };

        // 分流
        SingleOutputStreamOperator<SensorReading> processStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {

            @Override
            public void processElement(SensorReading value, ProcessFunction<SensorReading, SensorReading>.Context ctx,
                                       Collector<SensorReading> out) {
                if (value.getTemperature() > 30) {
                    ctx.output(high, value);
                } else {
                    ctx.output(low, value);
                }
            }
        });

        // 获取侧写流
        DataStream<SensorReading> highStream = processStream.getSideOutput(high);
        DataStream<SensorReading> lowStream = processStream.getSideOutput(low);

        highStream.print("high");
        lowStream.print("low");

        // 合并流
        SingleOutputStreamOperator<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });

        ConnectedStreams<Tuple2<String, Double>, SensorReading> streams = warningStream.connect(lowStream);

        SingleOutputStreamOperator<Object> resultStream = streams.map(
                new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
                    @Override
                    public Object map1(Tuple2<String, Double> value) throws Exception {
                        return new Tuple3<>(value.f0, value.f1, "high temp warning");
                    }

                    @Override
                    public Object map2(SensorReading value) throws Exception {
                        return new Tuple2<>(value.getId(), "normal");
                    }
                }
        );

        resultStream.print();

        env.execute();
    }
}

输出:

high> SensorReading{temperature=35.8, timestamp=1547718199, id='sensor_1'}
high> SensorReading{temperature=34.8, timestamp=1547718299, id='sensor_1'}
low> SensorReading{temperature=15.4, timestamp=1547718201, id='sensor_6'}
low> SensorReading{temperature=25.4, timestamp=1547718202, id='sensor_6'}
low> SensorReading{temperature=6.7, timestamp=1547718202, id='sensor_7'}
low> SensorReading{temperature=16.7, timestamp=1547718203, id='sensor_7'}
high> SensorReading{temperature=38.1, timestamp=1547718205, id='sensor_10'}
high> SensorReading{temperature=38.3, timestamp=1547718206, id='sensor_10'}
(sensor_1,35.8,high temp warning)
(sensor_6,normal)
(sensor_1,34.8,high temp warning)
(sensor_6,normal)
(sensor_10,38.1,high temp warning)
(sensor_7,normal)
(sensor_10,38.3,high temp warning)
(sensor_7,normal)
  1. Connect 的数据类型可以不同,Connect 只能合并两个流;
  2. Union可以合并多条流,Union的数据结构必须是一样的;
4.3.5:更细粒度的UDF函数 4.4:Sink
``Flink``没有类似于``spark``中``foreach``方法,让用户进行迭代的 *** 作。虽有对外的输出 *** 作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出 *** 作。

4.5:Window

​ streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算 *** 作。

4.5.1:Windown的类型

时间窗口(Time Window)
滚动时间窗口
滑动时间窗口
会话窗口
计数窗口(Count Window)
滚动计数窗口
滑动计数窗口
TimeWindow:按照时间生成Window。
CountWindow:按照指定的数据条数生成一个Window,与时间无关。

滚动窗口:

​ 依据固定的窗口长度对数据进行切分
​ 时间对齐,窗口长度固定,没有重叠

滑动窗口:

​ 可以按照固定的长度向后滑动固定的距离
​ 滑动窗口由固定的窗口长度和滑动间隔组成
​ 可以有重叠(是否重叠和滑动距离有关系)
​ 滑动窗口是固定窗口的更广义的一种形式,滚动窗口可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)

会话窗口:

​ 由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口。
​ 特点:时间无对齐。

4.5.2:Window API

窗口分配器——window()方法
我们可以用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他处理 *** 作。
注意window()方法必须在keyBy之后才能使用。keyBy是一个分组函数。
Flink提供了更加简单的.timeWindow()和.countWindow()方法,用于定义时间窗口和计数窗口。

		// windowAll 全部放入同一个分区 不建议使用        

		/**
         * keyBy 之后的分组函数包括
         *      timeWindow(Time)    调用滚动窗口
         *      timeWindow(Time, Time)  调用滑动窗口
         *      window()    自己传入窗口类型
         *      window(EventTimeSessionWindows.withGap(Time.seconds(10)))   会话窗口
         *      countWindow(long)   滚动
         *      countWindow(long, long) 滑动
         */

窗口函数

window function 定义了要对窗口中收集的数据做的计算 *** 作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)

增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。

全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction

其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流

4.6:时间语义

Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行 *** 作算子的本地系统时间,与机器相关;

Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

  timeWindow(Time, Time)  调用滑动窗口
            window()    自己传入窗口类型
            window(EventTimeSessionWindows.withGap(Time.seconds(10)))   会话窗口
            countWindow(long)   滚动
            countWindow(long, long) 滑动

窗口函数

window function 定义了要对窗口中收集的数据做的计算 *** 作,主要可以分为两类:
增量聚合函数(incremental aggregation functions)
全窗口函数(full window functions)

增量聚合函数
每条数据到来就进行计算,保持一个简单的状态。(来一条处理一条,但是不输出,到窗口临界位置才输出)
典型的增量聚合函数有ReduceFunction, AggregateFunction。

全窗口函数
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。(来一个放一个,窗口临界位置才遍历且计算、输出ProcessWindowFunction,WindowFunction

其他API
trigger() ——触发器定义window 什么时候关闭,触发计算并输出结果
evitor() ——移除器定义移除某些数据的逻辑
allowedLateness() ——允许处理迟到的数据
sideOutputLateData() ——将迟到的数据放入侧输出流
getSideOutput() ——获取侧输出流

4.6:时间语义

Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行 *** 作算子的本地系统时间,与机器相关;

Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。








欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720028.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存