Flink入门学习

Flink入门学习,第1张

Flink入门学习 Flink学习

官网: https://flink.apache.org/

1. Flink简介

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Apache Flink是一个框架和分布式处理引擎,用于对无边界和有边界的数据流进行有状态的计算。Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。

1.2 为什么选择Flink

流数据 源源不断的数据

目标: 低延迟 高吞吐 准确性 容错性

1.3 Flink特点

事件驱动

有界流 使用DataSet

无界流 使用 DataStreamAPI

分层API

支持事件时间和处理时间

精确一次的状态一致性保证

低延迟 每秒百万个事件 毫秒级延迟

高可用

与众多常用存储系统的链接

1.4 Flink VS Spark Streaming

流处理 vs 微批处理

    数据模型运行时架构
2.Flink 部署 2.1 Standalone单机

可以使用webui界面部署

也可以使用shell命令

#启动命令
/bin/start-cluster.sh
#停止
/bin/stop-cluster.sh

#提交任务
/bin/flink run -c [指定启动类] -p [并行度] [要提交的jar包地址] [指定jvm参数]
#查看当前所有作业
/bin/flink list
#取消作业
/bin/flink cancel [jobId]
2.2. Yarn

需要hadoop集群

没有安装条件 略

2.3 k8s

3. Flink 运行架构 3.1 运行时组件 3.1.1 作业管理器JobManager

3.1.2 任务管理器TaskManager

3.1.3 资源管理器ResourceManager

3.1.4 分发器Dispatcher

3.2 任务提交流程

3.3. 任务调度原理

3.4 Slot

并行度: 一个特定算子的子任务的个数称为其并行度

一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度

Slots 是指Flink计算中执行一个线程所需要资源(CPU,内存)的最小单元

所以Slot的数量一般设置为TaskManager(JVM)的核心数

Slot 有分组的概念

如果是不同的组,必须使用不同的Slot

3.5 程序与数据流DataFlow

Flinke程序分为三大块: Source transform sink

数据传输的形式:

    One-to-one 必须是同共享组,并行度也相同的情况下才会One-to-oneRedistributing 重新分区 *** 作, 当并行度不一样时会进行重新分区轮询 *** 作
4. 流处理API

流处理过程

Environment => source => transform => sink

4.1 Environment

执行环境

//流处理执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 批处理执行环境
 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

//创建本地执行环境
ExecutionEnvironment.createLocalEnvironment([并行度]);

//创建远程执行环境
ExecutionEnvironment.createRemoteEnvironment(host,port,jar包地址 );


4.2 Source

Flink可以从不同数据源读取数据

4.2.1 从集合和元素中读取数据

API executionEnvironment.fromCollection(list);

public static void main(String[] args) throws Exception {

        // 创建流处理执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度1
        executionEnvironment.setParallelism(1);

        // 创造集合数据
        List list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));
        }
        // 从集合中收集数据
        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);
        // 打印集合数据
        sensorReadingDataStreamSource.print("sensor");

        // 从元素中收集数据
        DataStreamSource integerDataStreamSource = executionEnvironment.fromElements(1, 2, 3, 4, 56, 7);
        // 打印从元素中收集到数据
        integerDataStreamSource.print("element");

        // 执行Flink程序
        executionEnvironment.execute();

    }
4.2.2 从文件中读取数据

API executionEnvironment.readTextFile(inputPath);

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        String inputPath = "E:\张尧\idea项目\FlinkTutorial\src\main\resources\word.txt";
 		DataStreamSource dataStreamSource = executionEnvironment.readTextFile(inputPath);

        SingleOutputStreamOperator> sum = dataStreamSource.flatMap(new WorkCount.MyFlagMapFunction()).keyBy(0).sum(1);
        sum.print();

        executionEnvironment.execute();
    }
4.2.3 从Kafka中读取数据 4.2.3.1 kafka配置

下载kafka 1.0.0版本以上

需要配置kafka的监听地址(本机除外)

修改config/server.properties

advertised.listeners=PLAINTEXT://192.168.164.205:9092
#启动kafka bin目录下
#启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties 
#启动kafka
./kafka-server-start.sh config/server.properties
package com.zy.flink.source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;


public class KafkaSourceTest {

    public static void main(String[] args) throws Exception {

        // 创建kafka连接配置信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.164.205:9092");
//        properties.setProperty("group.id", "")

        // 创建流处理执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 从kafka从获取数据
        DataStreamSource dataStreamSource = executionEnvironment.addSource(new FlinkKafkaConsumer("sourcetest",
                new SimpleStringSchema(), properties));

        dataStreamSource.print();

        executionEnvironment.execute();
    }
}

4.2.4 自定义数据源
package com.zy.Flink.source;

import com.sun.org.apache.xpath.internal.operations.Bool;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;


public class UDFSourceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.addSource(new MySensorSource());
        sensorReadingDataStreamSource.print();
        executionEnvironment.execute();
    }


    public static class MySensorSource implements SourceFunction{

        //定义属性控制数据的生成
        private Boolean running = true;



        @Override
        public void run(SourceContext sourceContext) throws Exception {
            //定义传感器集合
            HashMap map = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                map.put("sensor"+i, 60 + ThreadLocalRandom.current().nextGaussian() * 20);
            }
            while (running){
                for (String s : map.keySet()) {
                    sourceContext.collect(new SensorReading(s, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(),map.get(s)+ThreadLocalRandom.current().nextGaussian()));
                }
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

4.3 Transform

转换算子

4.3.1 基本转换算子

map flatMap filter 这三个是基本转换算子

package com.zy.Flink.transform;

import com.zy.Flink.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;


public class TransormTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 创造集合数据
        List list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));
        }

        // 使用集合收集数据
        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);


        // map转换 返回sensorReading的sensorId
        SingleOutputStreamOperator map = sensorReadingDataStreamSource.map(new MapFunction() {
            @Override
            public Object map(SensorReading sensorReading) throws Exception {
                return sensorReading.getId();
            }
        });

        // flatMap转换 将各个属性拆分输出
        SingleOutputStreamOperator flatMap = sensorReadingDataStreamSource.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(SensorReading sensorReading, Collector collector) throws Exception {
                String[] split = sensorReading.toString().split(", ");
                for (String s : split) {
                    collector.collect(s);
                }
            }
        });


        // filter 过滤转换
        SingleOutputStreamOperator filter = sensorReadingDataStreamSource.filter(new FilterFunction() {
            @Override
            public boolean filter(SensorReading sensorReading) throws Exception {
                return sensorReading.getId().equals("Sensor2");

            }
        });

        // 输出map转换后的数据
        map.print("map");

        // 输出flatmao转换后的数据
        flatMap.print("flatMap");

        // 输出filter转换后的数据
        filter.print("filter");

        executionEnvironment.execute();
    }
}

 
4.3.2 聚合算子 

keyBy 滚动聚合算子(Rolling Aggregation min() max() sum() minBy() maxBy()) Reduce是聚合类的算子

package com.zy.Flink.transform;

import com.zy.Flink.TestUtil;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransormTest3 {

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        executionEnvironment.setParallelism(1);

        // 从集合收集数据
        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());

        // 根据id keyBy 取最大温度
        SingleOutputStreamOperator max = sensorReadingDataStreamSource.keyBy("id").maxBy("tmpperature");

        // 输出
        max.print();

        // 执行
        executionEnvironment.execute();

    }
}

聚合算子 min() 与 minBy() 的区别: min 只有聚合的字段是最小的,其他字段还是第一次收集 到的数据

minBy()是最小的聚合字段对应的数据

4.3.3 Reduce
package com.zy.Flink.transform;

import com.zy.Flink.TestUtil;
import com.zy.Flink.entity.SensorReading;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransormTest4_Reduce {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());

        KeyedStream keyedStream = sensorReadingDataStreamSource.keyBy(new KeySelector() {
            @Override
            public Object getKey(SensorReading value) throws Exception {
                return value.getId();
            }
        });

        SingleOutputStreamOperator reduce = keyedStream.reduce((value1, value2) -> {
            return new SensorReading(value1.getId(), System.currentTimeMillis(), Math.max(value1.getTmpperature(), value2.getTmpperature()));
        });

        reduce.print();

        executionEnvironment.execute();

    }
}

4.3.4 多流转换

split(1.12移除)

connect map 只能合并两条流

union 合并多条流

4.3.5 数据类型

Flink支持的数据类型

    支持Java和scale的所有基本数据类型(包括包装类)Java 和 Scale 元组Scale样例类 ?Java 简单对象 (空参构造)Java 集合 枚举
4.3.6 UDF 函数
    函数类匿名函数富函数
4.3.7 数据重分区 4.3.7.1 shuffle

打乱分区顺序,重新分区

4.3.7.2 keyby

根据hash计算出分区,相同的key一定在同一分区(同一分区的key不一定相同)

4.3.7.3 global

当前的所有流发送到下一分区(同一个分区)

package com.zy.flink.transform;

import com.zy.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransfromTest6_partition {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 要测试充分区 *** 作,就不能设置并行度为1
        executionEnvironment.setParallelism(4);

        // 读取文件数据
        SingleOutputStreamOperator dataStreamSource = executionEnvironment.readTextFile("E" +
                ":\张尧\idea项目\tl\Flink_learn\learn_feature\src\main\resources" +
                "\sensorReading.txt").map(line -> {
            String[] split = line.split(",");
            return new SensorReading(split[0],new Long(split[1]),new Double(split[2]));
        });

        //输出源流
        dataStreamSource.print("input");

        // shuffle
        dataStreamSource.shuffle().print("shuffle");

        // keyBy
        dataStreamSource.keyBy("id").print("keyBy");

        // global
        dataStreamSource.global().print("global");

        // 执行作业
        executionEnvironment.execute();


    }
}

4.4 Sink 4.4.1 写入kafka
package com.zy.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;


public class KafkaSink {

    public static void main(String[] args) throws Exception {
        // 创建流处理执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 从kafka获取消息
        // 创建kafka配置信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.164.205:9092");

        // 消费消息
        DataStreamSource sourcetest =
                executionEnvironment.addSource(new FlinkKafkaConsumer("sourcetest",
                new SimpleStringSchema(), properties));


        // 写入kafka
        DataStreamSink sinktest = sourcetest.addSink(new FlinkKafkaProducer("192.168.164.205:9092", "sinktest", new SimpleStringSchema()));

        // 执行作业
        executionEnvironment.execute();

    }
}

需要依赖kafka connector连接器

        
            org.apache.flink
            flink-connector-kafka_${flinks-clients.suffix.version}
        
4.4.2 写入redis
2.11
        1.0



            
            
                org.apache.bahir
                flink-connector-redis_${flinks-clients.suffix.version}
                ${flinks-redis.version}
            
package com.zy.flink.sink;

import com.zy.flink.TestUtil;
import com.zy.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;


public class RedisSinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());

        // 创建jedis配置环境
        FlinkJedisPoolConfig flinkJedisPoolConfig =
                new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
        DataStreamSink test = sensorReadingDataStreamSource.addSink(new RedisSink(flinkJedisPoolConfig,
                new RedisMapper() {
                    // 创建执行方法的描述
                    @Override
                    public RedisCommandDescription getCommandDescription() {
                        return new RedisCommandDescription(RedisCommand.HSET, "test");
                    }

                    @Override
                    public String getKeyFromData(SensorReading sensorReading) {
                        return sensorReading.getId();
                    }

                    @Override
                    public String getValueFromData(SensorReading sensorReading) {
                        return sensorReading.getTmpperature().toString();
                    }
                }));

        executionEnvironment.execute();
    }
}

4.4.3 写入es 4.4.4 写入jdbc

就是写入数据库

5. window 5.1 window 概念

窗口

窗口就是无界流切割为有限流的一种方式

5.2 window 类型
    时间窗口
      滚动时间窗口 Tumbling Windows
        时间对齐,窗口长度固定,一个数据只属于一个窗口
      滑动时间窗口
        滑动窗口有步长,一个数据可以存在多个窗口
      会话窗口
        时间无对齐
    计数窗口
      滚动计数窗口滑动计数窗口
5.3. window API 5.3.1 窗口分配器

window()

timeWindow()

countWindow()

都是开窗 *** 作

5.3.2 window Function

当开窗之后,需要使用window Function进行聚合 *** 作

    增量聚合函数 bucket中只存储一个sum的结果,每来一条数据就计算一次
      ReduceFunctionAggregateFunction…
    全窗口函数 收集所有的数据放入bucket 最终计算
      ProcessWindowFunctionWindowFunction…
package com.zy.flink.window;

import com.zy.flink.entity.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;


public class WindowApiTest1_TimeWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//        DataStreamSource sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());
        SingleOutputStreamOperator dataStreamSource = executionEnvironment.socketTextStream(
                "192.168.164.205", 8888).map(line -> {
            String[] splits = line.split(",");
            return new SensorReading(splits[0], new Long(splits[1]), new Double(splits[2]));
        });
        SingleOutputStreamOperator resultStream = dataStreamSource.keyBy("id")
                .timeWindow(Time.seconds(10))
                .aggregate(new AggregateFunction() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading s, Integer integer) {
                        return integer + 1;
                    }

                    @Override
                    public Integer getResult(Integer integer) {
                        return integer;
                    }

                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return integer + acc1;
                    }
                });

        resultStream.print();
        executionEnvironment.execute();
    }
}

5.3.3 可选API
    trigger 触发器
      定义window的关闭时间,什么时候触发计算输出结果
    evictor 移除器
      定义移除某些数据的逻辑
    allowLateness
      允许迟到数据
    sideOutputLateDate
      将迟到数据放入侧输出流
    getSideOutput
      获取测输出流
6. 时间 6.1 时间语义

分为三个时间

    event Time 事件本身的时间,业务数据产生自身的时间Ingestion Time 数据进入Flink的时间Process Time flink *** 作算子进行计算的计算时间
6.2 EventTime

设置eventTime

executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
6.3 水位线 watemark

水位线用来处理乱序数据的延迟到达,当数据由于网络或分布式导致到达时间不是顺序时,要使用水位线平衡延迟

6.4 水位线的传递,引入和设定

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715678.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)