Flink java 流处理API

Flink java 流处理API,第1张

Flink java 流处理API 一、创建执行环境
        //创建批处理执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //创建流处理执行环境
        StreamExecutionEnvironment env_stream =
                StreamExecutionEnvironment.getExecutionEnvironment();

二、Transform 简单转换算子 1.Map

Map转换算子不会改变流中的数据的数量,即一对一关系。可以通过map == 映射来方便记忆。

Map经常用于将流中的数据进行包装成pojo,或进行简单的处理。

DataStream mapStram = dataStream.map(new MapFunction() {
    public Integer map(String value) throws Exception {
        return value.length();
    }
});

流调用map方法,实现MapFunction接口中的map方法。

2.FlatMap

FlatMap算子会改变流中的数据数量,一个数据过来往往会被切分成很多个数据。是一对多的关系。

经常利用FlatMap算子的这个特性去处理需要细分的数据流。

DataStream flatMap = dataSource.flatMap(new FlatMapFunction() {
    public void flatMap(String s, Collector collector) throws Exception {
        String[] words = s.split(" ");
        for (String word : words) {
            collector.collect(word);
        }
    }
});

 实现FlatMapFunction接口,重写flatMap方法。由于一个数据进来会产生多个数据,故采用collector来进行收集产生的多个数据。

3.Filter

Filter也会改变流中的数据量,但是不会改变数据的类型。只是进行简单的过滤 *** 作。

DataStream filter = dataSource.filter(new FilterFunction() {
    public boolean filter(String s) throws Exception {
        return s.contains("ala");
    }
});

4.KeyBy

KeyBy算子不改变原有数据的数据类型也不改变数据的数量,仅仅进行分组 *** 作。

直接通过.keyBy的方式进行分组,若流中的数据为元组类型可以直接通过0 1这种下标来指定要按照什么字段进行分组,若流中的数据为pojo类,则可以通过字段名进行指定。

滚动聚合算子 sum min max minBy maxBy

 其中min和minBy的区别在于:----max和maxBy类推

min(temperature)只会找到temperature这一个字段的最小值,其他字段还是停留在流中的第一个数据的值。

而minBy在找到temperature这个字段的最小值的同时,其他字段也是该最小值对应的字段。

reduce算子

reduce算子的特点在于:

它是一个分组数据流的聚合 *** 作,合并当前的元素 和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是 只返回最后一次聚合的最终结果。这个每一次的聚合结果是有自定义的reduce方法决定的。

.reduce(new ReduceFunction() {
    public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
        return new SensorReading(
                t1.getId(),
                Math.max(sensorReading.getTimestamp(), t1.getTimestamp()),
                Math.max(sensorReading.getTemperature(), t1.getTemperature()));
    }
});

分流与合流 1.分流与拣选----split select

所谓的分流 *** 作并不是将流拆分成了两条流,实际上还是一条流,只不过为这一条流打上了不同的标签,后期根据标签就可以拣选出自己想要的流了。

以下是拣选过程:

//进行分流
SplitStream splitStream = dataStream.split(new OutputSelector() {
    public Iterable select(SensorReading sensorReading) {
        return (sensorReading.getTemperature() > 35) ?
                Collections.singletonList("high") :
                Collections.singletonList("low");
    }
});
//进行拣选
DataStream high = splitStream.select("high");
DataStream low = splitStream.select("low");
DataStream all = splitStream.select("high", "low");
 2.合流----Connect

所谓的合流并没有将两条流变成一条流,内部还是各自是各自的数据类型,往往将两条流Connect之后会进行一步map *** 作,通过map使得两条流变成同一数据类型(一般选取两条流的数据类型的公共父类作为合并流的数据类型,一般选取Object类型)。

3.并流----Union

Connect与Union之间的区别:

        1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。

        2. Connect 只能 *** 作两个流,Union 可以 *** 作多个。

三、富函数(RichFunctions) 什么是富函数:

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都 有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一 些生命周期方法,所以可以实现更复杂的功能。由于功能更加的丰富所以被叫做富函数。

富函数的生命周期:

Rich Function 有一个生命周期的概念。典型的生命周期方法有:
open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。

close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。

例如:当我们需要在map、filter 等函数中 *** 作数据库的时候,我们就应该把与数据库的连接、关闭连接放在生命周期中,这样避免了平凡的去与数据库进行连接以及关闭。

public static class MySinkJDBC extends RichSinkFunction {
    Connection connection = null;
    PreparedStatement insertStat = null;
    PreparedStatement updateStat = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager
                .getConnection("jdbc:mysql://111.11.111.111:3306/ala?characterEncoding=utf-8&useSSL=false",
                        "root", "root");
        insertStat = connection.prepareStatement("insert into sensor(id, temp) values(?, ?)");
        updateStat = connection.prepareStatement("update sensor set temp = ? where id = ?");
    }
    @Override
    public void invoke(SensorReading value, Context context) throws Exception {
        updateStat.setDouble(1, value.getTemperature());
        updateStat.setString(2, value.getId());
        updateStat.execute();
        if (updateStat.getUpdateCount() == 0){
            insertStat.setString(1, value.getId());
            insertStat.setDouble(2, value.getTemperature());
            insertStat.execute();
        }
    }
    @Override
    public void close() throws Exception {
        insertStat.close();
        updateStat.close();
        connection.close();
    }
}

四、Flink中的Window Window的分类

 Window Function

窗口函数定义了在一个窗口中需要干的事情,主要分为两类:

增量聚合函数:每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction, AggregateFunction。

全量窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。 ProcessWindowFunction 就是一个全窗口函数。

//进行keyBy并进行基于时间开窗进行增量聚合函数
        DataStream resultStream = sensorReadingDataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading sensorReading, Integer integer) {
                        return integer + 1;
                    }

                    @Override
                    public Integer getResult(Integer integer) {
                        return integer;
                    }

                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return integer + acc1;
                    }
                });

        //进行keyBy并进行基于时间开窗进行全量聚合函数
        DataStream> resultStream = sensorReadingDataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception {
                        String id = tuple.getField(0);
                        Long windowEnd = timeWindow.getEnd();
                        Integer count = IteratorUtils.toList(iterable.iterator()).size();
                        collector.collect(new Tuple3<>(id, windowEnd, count));
                    }
                });

五、时间语义与 Wartermark 三种时间语义 1.Event Time

是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。

2.Ingestion Time

是数据进入 Flink 的时间。

3.Processing Time

是每一个执行基于时间 *** 作的算子的本地系统时间,与机器 相关,默认的时间属性就是 Processing Time。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Watermark

每来一个数据后面就跟着一个Watermark时间(这个不是绝对的也有周期性生成Watermark的),该时间 = 来的数据的时间 - 设置的Watermark。如果这个差值等于了窗口大小就关窗。

Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻 为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。

例子:

一个基于Event Time的例子:

图解1:

此图中的窗口时间设置为5s,Watermark 设置为2s。图中虚线的两个位置代表关窗的时间。

当7s的数据来的时候,7 - 2 = 5,是窗口大小的整数倍,因此触发了关窗。

如果7s的这个数据一直没有来,并且大于7s的数据也没有来,那么这个窗口就会一直不进行关闭 *** 作。

图解2:

总结:

1.在设置Watermark的时候应该将值设置为最大的乱序查(上图中10在8前面,那么最大的乱序差就是2)这样就可以保证在关窗的时候所有的时间都已经到齐了。

2.下面的绿色的三角形的值由于是Watermark实时更新的事件时间,所以这个时间值是不可逆的,只能单调递增。

Watermark设置代码:
//将dataSource里面获取的数据map成SensorReading类型
        DataStream sensorReadingDataStream = dataSource.map(s -> {
            String[] split = s.split(",");
            return new SensorReading(split[0],
                    Long.parseLong(split[1].trim().substring(0, split[1].trim().length() - 1)),
                    Double.parseDouble(split[2]));
        })
                //分配时间戳和Watermark
                //1.流中的数据完全的升序,即没有乱序的情况,自然不需要设置Watermark,只需要分配时间戳即可。
//                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
//                    @Override
//                    public long extractAscendingTimestamp(SensorReading sensorReading) {
//                        return sensorReading.getTimestamp();
//                    }
//                });
                //2.流中的数据存在乱序,将Watermark设置为2s,并且分配时间戳。
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading sensorReading) {
                        return sensorReading.getTimestamp() * 1000;
                    }
                });

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5711145.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存