【深入浅出flink】第4篇:flink常见的并行度和多并行度Source,你掌握了多少?

【深入浅出flink】第4篇:flink常见的并行度和多并行度Source,你掌握了多少?,第1张

【深入浅出flink】第4篇:flink常见的并行度和多并行度Source,你掌握了多少?

大家好,我是雷恩Layne,这是《深入浅出flink》系列的第四篇文章,我旨在用最直白的语言写好flink,希望能让所有看到的人一目了然。如果大家喜欢,欢迎点赞、关注,也欢迎留言,共同交流flink的点点滴滴 O(∩_∩)O

文章目录

1. Source简介2. Flink预定义的Source

2.1 基于文件的Source2.2 基于Socket的Source2.3 基于集合的Source2.4 基于Kafka的Source 3. 自定义单并行度Source4. 自定义多并行度Source

DataStream是Flink的较低级API,用于进行数据的实时处理任务,可以将该编程模型分为Source、Transformation、Sink三个部分,如下图所示。本文来介绍常用的并行度Source和多并行度Source。

1. Source简介

source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)
来为你的程序添加一个source。

flink提供了大量的已经实现好的source方法,也可以自定义source:

    通过实现sourceFunction接口来自定义无并行度的source通过实现ParallelSourceFunction 接口or 继承RichParallelSourceFunction 来自定义有并行度的
    source

大多数情况下,我们使用自带的source即可。

2. Flink预定义的Source

flink提供了大量的已经实现好的source,常见的有:

基于文件的Source基于Socket的Source基于集合的Source基于Kafka的Source 2.1 基于文件的Source

常见基于文件的Source有如下四种:

//1.逐行读取文本文件(文件符合 TextInputFormat 格式),并作为字符串返回每一行。
readTextFile(String path)

//2.按指定的文件输入格式(fileInputFormat)读取指定路径的文件。
readFile(FileInputFormat inputFormat,String path) 
 
//3.前两个方法的内部调用方法
readFile(FileInputFormat inputFormat,
  		 String filePath,
		 FileProcessingMode watchType,
		 long interval,
		 FilePathFilter filter)

//4.处理流文件    
readFileStream(String filePath, long intervalMillis, 
			   FileMonitoringFunction.WatchType watchType)

前两个好理解,这里着重讲解一下第三个和第四个方法。

第三个方法是前两个方法的内部调用方法,总共有四个参数,具体含义如下:

参数说明实例inputFormat创建DataStream指定的输入格式new TextInputFormat(new Path(filePath))filePath读取的文件路径,为URI格式。既可以读取普通文件,可以读取HDFS文件file:///some/local/file 或
hdfs://host:port/file/pathwatchType文件数据处理方式FileProcessingMode.PROCESS_ONCE或
FileProcessingMode.PROCESS_CONTINUOUSLYinterval在周期性监控Source的模式下(PROCESS_CONTINUOUSLY),指定每次扫描的时间间隔10filter忽略给定的path

watchType有两种策略:

FileProcessingMode.PROCESS_ONCE:定期监听路径下的新数据FileProcessingMode.PROCESS_CONTINUOUSLY:处理当前contents中的文件然后退出(只处理一次)

第四个方法是用来支持读取流式文件的,方法中intervalMilli是轮询的间隔,watchType有三种策略:

FileMonitoringFunction.WatchType.ONLY_NEW_FILES:只处理新文件FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED :只处理追加的内容FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED :既要处理追加的内容,也会处理file中之前的内容 2.2 基于Socket的Source

常见基于Socket的Source主要有两种:

//1.逐行从socket中读取内容
socketTextStream(String hostname, int port)
    
//2.按照给定分隔符读取socket的内容,默认为"n"
socketTextStream(String hostname, int port, String delimiter)

socketTextStream比较简单,没啥好说的。

2.3 基于集合的Source

常见基于集合的Source有五种:

//1.用 Java.util.Collection 对象创建数据流,集合中的所有元素必须属于同一类型
fromCollection(Seq) 

//2.用迭代器创建数据流。指定迭代器返回的元素的数据类型
fromCollection(Iterator)

//3.从给定的对象序列创建数据流。所有对象必须属于同一类型
fromElements(elements: _*)
 
//4.并行地从迭代器创建数据流。指定迭代器返回的元素的数据类型;
fromParallelCollection(SplittableIterator) #
 
//5.并行生成给定间隔的数字序列。
generateSequence(from, to) 

举一个比较简单的例子:

DataStream ds1 = env.fromElements(1, 2, 4, 67, 189);
DataStream ds2 = env.fromCollection(Arrays.asList(1, 2, 4, 67, 189));
2.4 基于Kafka的Source

flink给给我提供了消费kafka的source,这使得我们用一行代码就可以消费kafka中的数据。

根据不同的版本,flink给我们提供了三种kafka source,分别是:

FlinkKafkaConsumer09FlinkKafkaConsumer010FlinkKafkaConsumer011

举例如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "wxler1:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");

DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
// 打印输出
dataStream.print();

env.execute();
3. 自定义单并行度Source

除了flink本身提供的source之外,我们也可以自定义source。可以通过实现sourceFunction接口来自定义无并行度的source。

示例如下:

(1)自定义Source

import org.apache.flink.streaming.api.functions.source.SourceFunction;
//功能:每秒产生一条数据
public class MyNoParallelSource implements SourceFunction {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning=false;
    }
}

(2)定义Consume,消费Source的数据,并打印输出

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//功能:打印输出偶数
public class MyNoParallelConsumer {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
        //默认并行度为1
        DataStreamSource numberStream = env.addSource(new MyNoParallelSource());
        DataStream dataStream = numberStream.map(new MapFunction() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        DataStream filterDataStream = dataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0; //过滤偶数
            }
        });
        filterDataStream.print().setParallelism(1);
        env.execute();
    }
}

执行输出:

接受到了数据:1
接受到了数据:2
2
接受到了数据:3
接受到了数据:4
4
接受到了数据:5
接受到了数据:6
6
4. 自定义多并行度Source

通过实现ParallelSourceFunction 接口or 继承RichParallelSourceFunction 来自定义有并行度的
source。

(1)自定义多并行度Source

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
//功能:自定义支持并行度的数据源
public class MyParallelSource implements ParallelSourceFunction {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning=false;
    }
}

(2)定义Consume,消费多并行度Source的数据,并打印输出

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//功能:消费多并行度Source的数据,并打印输出偶数
public class MyParallelConsumer {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
        //默认并行度为cpu core数,我这里为4
        DataStreamSource numberStream = env.addSource(new MyParallelSource());
        DataStream dataStream = numberStream.map(new MapFunction() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        DataStream filterDataStream = dataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0;
            }
        });
        filterDataStream.print().setParallelism(1);
        env.execute();
    }
}

执行输出:

接受到了数据:1
接受到了数据:1
接受到了数据:1
接受到了数据:1
接受到了数据:2
接受到了数据:2
接受到了数据:2
接受到了数据:2
2
2
2
2
接受到了数据:3
接受到了数据:3
接受到了数据:3
接受到了数据:3
接受到了数据:4
接受到了数据:4
接受到了数据:4
接受到了数据:4
4
4
4
4

可以看到,如果不设置并行度,Source默认并行度为cpu core数,我这里是4。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700266.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存