大家好,我是雷恩Layne,这是《深入浅出flink》系列的第四篇文章,我旨在用最直白的语言写好flink,希望能让所有看到的人一目了然。如果大家喜欢,欢迎点赞、关注,也欢迎留言,共同交流flink的点点滴滴 O(∩_∩)O
文章目录1. Source简介2. Flink预定义的Source
2.1 基于文件的Source2.2 基于Socket的Source2.3 基于集合的Source2.4 基于Kafka的Source 3. 自定义单并行度Source4. 自定义多并行度Source
DataStream是Flink的较低级API,用于进行数据的实时处理任务,可以将该编程模型分为Source、Transformation、Sink三个部分,如下图所示。本文来介绍常用的并行度Source和多并行度Source。
1. Source简介source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)
来为你的程序添加一个source。
flink提供了大量的已经实现好的source方法,也可以自定义source:
- 通过实现sourceFunction接口来自定义无并行度的source通过实现ParallelSourceFunction 接口or 继承RichParallelSourceFunction 来自定义有并行度的
source
大多数情况下,我们使用自带的source即可。
2. Flink预定义的Sourceflink提供了大量的已经实现好的source,常见的有:
基于文件的Source基于Socket的Source基于集合的Source基于Kafka的Source 2.1 基于文件的Source
常见基于文件的Source有如下四种:
//1.逐行读取文本文件(文件符合 TextInputFormat 格式),并作为字符串返回每一行。 readTextFile(String path) //2.按指定的文件输入格式(fileInputFormat)读取指定路径的文件。 readFile(FileInputFormatinputFormat,String path) //3.前两个方法的内部调用方法 readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter) //4.处理流文件 readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
前两个好理解,这里着重讲解一下第三个和第四个方法。
第三个方法是前两个方法的内部调用方法,总共有四个参数,具体含义如下:
hdfs://host:port/file/path
FileProcessingMode.PROCESS_CONTINUOUSLY
watchType有两种策略:
FileProcessingMode.PROCESS_ONCE:定期监听路径下的新数据FileProcessingMode.PROCESS_CONTINUOUSLY:处理当前contents中的文件然后退出(只处理一次)
第四个方法是用来支持读取流式文件的,方法中intervalMilli是轮询的间隔,watchType有三种策略:
FileMonitoringFunction.WatchType.ONLY_NEW_FILES:只处理新文件FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED :只处理追加的内容FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED :既要处理追加的内容,也会处理file中之前的内容 2.2 基于Socket的Source
常见基于Socket的Source主要有两种:
//1.逐行从socket中读取内容 socketTextStream(String hostname, int port) //2.按照给定分隔符读取socket的内容,默认为"n" socketTextStream(String hostname, int port, String delimiter)
socketTextStream比较简单,没啥好说的。
2.3 基于集合的Source常见基于集合的Source有五种:
//1.用 Java.util.Collection 对象创建数据流,集合中的所有元素必须属于同一类型 fromCollection(Seq) //2.用迭代器创建数据流。指定迭代器返回的元素的数据类型 fromCollection(Iterator) //3.从给定的对象序列创建数据流。所有对象必须属于同一类型 fromElements(elements: _*) //4.并行地从迭代器创建数据流。指定迭代器返回的元素的数据类型; fromParallelCollection(SplittableIterator) # //5.并行生成给定间隔的数字序列。 generateSequence(from, to)
举一个比较简单的例子:
DataStream2.4 基于Kafka的Sourceds1 = env.fromElements(1, 2, 4, 67, 189); DataStream ds2 = env.fromCollection(Arrays.asList(1, 2, 4, 67, 189));
flink给给我提供了消费kafka的source,这使得我们用一行代码就可以消费kafka中的数据。
根据不同的版本,flink给我们提供了三种kafka source,分别是:
FlinkKafkaConsumer09FlinkKafkaConsumer010FlinkKafkaConsumer011
举例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "wxler1:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStream3. 自定义单并行度SourcedataStream = env.addSource(new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties)); // 打印输出 dataStream.print(); env.execute();
除了flink本身提供的source之外,我们也可以自定义source。可以通过实现sourceFunction接口来自定义无并行度的source。
示例如下:
(1)自定义Source
import org.apache.flink.streaming.api.functions.source.SourceFunction; //功能:每秒产生一条数据 public class MyNoParallelSource implements SourceFunction{ private long number = 1L; private boolean isRunning = true; @Override public void run(SourceContext sct) throws Exception { while (isRunning){ sct.collect(number); number++; //每秒生成一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
(2)定义Consume,消费Source的数据,并打印输出
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //功能:打印输出偶数 public class MyNoParallelConsumer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment(); //默认并行度为1 DataStreamSourcenumberStream = env.addSource(new MyNoParallelSource()); DataStream dataStream = numberStream.map(new MapFunction () { @Override public Long map(Long value) throws Exception { System.out.println("接受到了数据:"+value); return value; } }); DataStream filterDataStream = dataStream.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value % 2 == 0; //过滤偶数 } }); filterDataStream.print().setParallelism(1); env.execute(); } }
执行输出:
接受到了数据:1 接受到了数据:2 2 接受到了数据:3 接受到了数据:4 4 接受到了数据:5 接受到了数据:6 64. 自定义多并行度Source
通过实现ParallelSourceFunction 接口or 继承RichParallelSourceFunction 来自定义有并行度的
source。
(1)自定义多并行度Source
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; //功能:自定义支持并行度的数据源 public class MyParallelSource implements ParallelSourceFunction{ private long number = 1L; private boolean isRunning = true; @Override public void run(SourceContext sct) throws Exception { while (isRunning){ sct.collect(number); number++; //每秒生成一条数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
(2)定义Consume,消费多并行度Source的数据,并打印输出
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //功能:消费多并行度Source的数据,并打印输出偶数 public class MyParallelConsumer { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment(); //默认并行度为cpu core数,我这里为4 DataStreamSourcenumberStream = env.addSource(new MyParallelSource()); DataStream dataStream = numberStream.map(new MapFunction () { @Override public Long map(Long value) throws Exception { System.out.println("接受到了数据:"+value); return value; } }); DataStream filterDataStream = dataStream.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value % 2 == 0; } }); filterDataStream.print().setParallelism(1); env.execute(); } }
执行输出:
接受到了数据:1 接受到了数据:1 接受到了数据:1 接受到了数据:1 接受到了数据:2 接受到了数据:2 接受到了数据:2 接受到了数据:2 2 2 2 2 接受到了数据:3 接受到了数据:3 接受到了数据:3 接受到了数据:3 接受到了数据:4 接受到了数据:4 接受到了数据:4 接受到了数据:4 4 4 4 4
可以看到,如果不设置并行度,Source默认并行度为cpu core数,我这里是4。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)