Flink源码之StreamExecutionEnvironment

Flink源码之StreamExecutionEnvironment,第1张

StreamExecutionEnvironment ExecutionConfig 并行度 Parallelism
  1. 有默认值,等于cpu 数量
  2. getter and setter方法
  3. 最大并行度: 0 < maxParallelism <= 2^15 - 1
重启策略 RestartStrategies
  1. setRestartStrategy()
  2. getRestartStrategy()
链化优化 isChainingEnabled
  1. isChainingEnabled
  2. disableOperatorChaining
重试次数 numberOfExecutionRetries
  1. setNumberOfExecutionRetries
  2. getNumberOfExecutionRetries
TimeCharacteristic

默认为处理时间

  1. setStreamTimeCharacteristic(时间类型)

    1. 处理时间时AutoWatermarkInterval水印间隔为0ms
    2. 事件时间时AutoWatermarkInterval水印间隔为200ms
CheckpointConfig
  1. checkpointingMode :EXACTLY_ONCE、AT_LEAST_ONCE

    1. getCheckpointingMode()
  2. checkpointTimeout

  3. CheckpointInterval

    1. enableCheckpointing
  4. forceCheckpointing

    1. isForceCheckpointing
    2. setForceCheckpointing
  5. PauseBetweenCheckpoints

    1. minPauseBetweenCheckpoints
    2. maxPauseBetweenCheckpoints
  6. maxConcurrentCheckpoints

  7. failOnCheckpointingErrors

    1. isFailOnCheckpointingErrors
  8. ExternalizedCheckpointCleanup

    允许外部持久化检查点

    枚举值有:

    1. DELETE_ON_CANCELLATION:job取消时删除持久化的检查点和状态。取消作业后,无法从外部的检查点恢复。
    2. RETAIN_ON_CANCELLATION:job取消时保留持久化的检查点和状态,取消作业后,必须手动删除删除检查点元数据和状态

    方法:

    enableExternalizedCheckpoints(cleanupMode) 设置检查点持久化存储的模式

    isExternalizedCheckpointsEnabled() 是否允许持久化到外部存储。

    getExternalizedCheckpointCleanup() 返回持久化外部存储的模式

StateBackend
  1. 描述如何存储和设置算子状态和检查点

  2. 定义了在执行期间 检查点数据和状态将会被持久化的数据结构(可以是hashtable、RockDB、其他数据存储)

主要方法 创建environment
  1. StreamExecutionEnvironment.getExecutionEnvironment
  2. StreamExecutionEnvironment.createLocalEnvironment
  3. StreamExecutionEnvironment.createRemoteEnvironment
数据源 基于集合的数据源 generateSequence

env.generateSequence(long from,long to) 生成从from到to的数字序列,并将序列数据作为输入数据源。如果并行度设置为1,则生成的元素序列是有序的。

fromElements

从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。并行度为1。

对于不确定的泛型类,可能需要显示指定 TypeInformation。

fromCollection

从一个给定的非空集合或者迭代器中创建数据流,并行度为1。

fromParallelCollection

创建一个包含了可拆分的迭代器中元素的数据流,允许框架创建一个并行数据流来返回迭代器中的元素。

迭代器在实际执行之前不会被修改,因此迭代器返回的数据类型必须以Type类的形式显示给出(因为JAVA编译器会删除泛型信息)

基于文件的数据源 readTextFile
  1. 一行一行的读取给定的文件,并创建一个数据流,包含一个包含每行内容的字符串。将使用系统的默认字符集或者指定字符集读取文件。
  2. readTextFile调用了readFile(TextInputFormat(路径, 字符集, 分隔符), 文件处理模式.处理一次, 时间间隔, 以文本类型读取文件)方法
  3. readFile()方法调用了createFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)方法。
  4. createFileInput调用addSource()方法。
readFile

按照指定的文件格式TextInputFormat读取文件。

createFileInput

createFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)

// 创建一个以传入时间为间隔的定期文件扫描监控器
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// 创建一个ready算子
ContinuousFileReaderOperator<OUT> reader = new ContinuousFileReaderOperator<>(inputFormat);
// 创建输入源,并发度读取environment中的并发配置。
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);

FileProcessingMode有两种,

  1. PROCESS_ONCE 处理路径上的当前内容,然后退出。
  2. PROCESS_CONTINUOUSLY 定期扫描路径以获取新内容。
createInput

根据TextInputFormat创建一个输入流的通用方法

基于Socket的数据源 socketTextStream

从一个socket中创建一个包含了接收到的无线的字符串的数据流,接收到的字符串是被系统默认decode的。socket本身不会报告终止,结果就是只有socket 优雅终止时会启动重试。

socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
addSource 添加数据源

添加一个数据源到数据流的拓扑图中。

默认并发为1,如果想要并发执行,可以实现ParallelSourceFunction 或者继承RichParallelSourceFunction
function – the user defined function
sourceName – Name of the data source
typeInfo – the user defined type information for the stream

@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
	if (typeInfo == null) {
		if (function instanceof ResultTypeQueryable) {
			typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
		} else {
			try {
				typeInfo = TypeExtractor.createTypeInfo(
						SourceFunction.class,
						function.getClass(), 0, null, null);
			} catch (final InvalidTypesException e) {
				typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
			}
		}
	}

	boolean isParallel = function instanceof ParallelSourceFunction;

	clean(function);
	StreamSource<OUT, ?> sourceOperator;
	if (function instanceof StoppableFunction) {
		sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
	} else {
		sourceOperator = new StreamSource<>(function);
	}

	return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
任务运行 execute

触发程序执行。环境将会执行程序的sink算子的所有上下游。sink算子有两种,打印结果集或者发送结果集到消息队列中。

可以显示指定jobName,默认为“Flink Streaming Job”.

getStreamGraph

获取流式任务的streamGraph。

  1. 取environment中的 状态后端,是否链式优化,环境上下文 作为参数构造一个流程图
  2. 将sinks算子之前的transformList进行迭代,在迭代中设置streamGraph的bufferTimeout、transofrmationUID、transformationUserHash、source
getExecutionPlan
  1. 运行程序是创建一个计划,返回执行的data flow graph。

  2. 这个方法在计划执行之前,需要被调用。

  3. 返回JSON字符串。

cast2StoppableSourceFunction

clean

返回给定函数的“closure cleaned”版本。仅当ExecutionConfig中未禁用闭包清理时才进行清理

addOperator

添加一个算子到转换list中。List> transformations

并不一定是用户调用,创建算子的API方法必须调用这个方法。

registerCacheFile

在分布式缓存中注册一个文件,这个文件在运行的时候将会被所有的用户自定义的function 作为一个本地localPath访问。

文件可以是本地文件,也可以是分布式文件。如果有必要,运行时将会临时拷贝文件到本地缓存中。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/725544.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存