- 有默认值,等于cpu 数量
- getter and setter方法
- 最大并行度: 0 < maxParallelism <= 2^15 - 1
- setRestartStrategy()
- getRestartStrategy()
- isChainingEnabled
- disableOperatorChaining
- setNumberOfExecutionRetries
- getNumberOfExecutionRetries
默认为处理时间
-
setStreamTimeCharacteristic(时间类型)
- 处理时间时AutoWatermarkInterval水印间隔为0ms
- 事件时间时AutoWatermarkInterval水印间隔为200ms
-
checkpointingMode :EXACTLY_ONCE、AT_LEAST_ONCE
- getCheckpointingMode()
-
checkpointTimeout
-
CheckpointInterval
- enableCheckpointing
-
forceCheckpointing
- isForceCheckpointing
- setForceCheckpointing
-
PauseBetweenCheckpoints
- minPauseBetweenCheckpoints
- maxPauseBetweenCheckpoints
-
maxConcurrentCheckpoints
-
failOnCheckpointingErrors
- isFailOnCheckpointingErrors
-
ExternalizedCheckpointCleanup
允许外部持久化检查点
枚举值有:
- DELETE_ON_CANCELLATION:job取消时删除持久化的检查点和状态。取消作业后,无法从外部的检查点恢复。
- RETAIN_ON_CANCELLATION:job取消时保留持久化的检查点和状态,取消作业后,必须手动删除删除检查点元数据和状态
方法:
enableExternalizedCheckpoints(cleanupMode) 设置检查点持久化存储的模式
isExternalizedCheckpointsEnabled() 是否允许持久化到外部存储。
getExternalizedCheckpointCleanup() 返回持久化外部存储的模式
-
描述如何存储和设置算子状态和检查点
-
定义了在执行期间 检查点数据和状态将会被持久化的数据结构(可以是hashtable、RockDB、其他数据存储)
- StreamExecutionEnvironment.getExecutionEnvironment
- StreamExecutionEnvironment.createLocalEnvironment
- StreamExecutionEnvironment.createRemoteEnvironment
env.generateSequence(long from,long to) 生成从from到to的数字序列,并将序列数据作为输入数据源。如果并行度设置为1,则生成的元素序列是有序的。
fromElements从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。并行度为1。
对于不确定的泛型类,可能需要显示指定 TypeInformation。
fromCollection从一个给定的非空集合或者迭代器中创建数据流,并行度为1。
fromParallelCollection创建一个包含了可拆分的迭代器中元素的数据流,允许框架创建一个并行数据流来返回迭代器中的元素。
迭代器在实际执行之前不会被修改,因此迭代器返回的数据类型必须以Type类的形式显示给出(因为JAVA编译器会删除泛型信息)
基于文件的数据源 readTextFile- 一行一行的读取给定的文件,并创建一个数据流,包含一个包含每行内容的字符串。将使用系统的默认字符集或者指定字符集读取文件。
- readTextFile调用了
readFile(TextInputFormat(路径, 字符集, 分隔符), 文件处理模式.处理一次, 时间间隔, 以文本类型读取文件)
方法 - readFile()方法调用了
createFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)
方法。 - createFileInput调用addSource()方法。
按照指定的文件格式TextInputFormat读取文件。
createFileInputcreateFileInput(TextInputFormat(路径, 字符集, 分隔符),typeInformation, 文件处理模式, 时间间隔)
// 创建一个以传入时间为间隔的定期文件扫描监控器
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
// 创建一个ready算子
ContinuousFileReaderOperator<OUT> reader = new ContinuousFileReaderOperator<>(inputFormat);
// 创建输入源,并发度读取environment中的并发配置。
SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader);
FileProcessingMode有两种,
- PROCESS_ONCE 处理路径上的当前内容,然后退出。
- PROCESS_CONTINUOUSLY 定期扫描路径以获取新内容。
根据TextInputFormat创建一个输入流的通用方法
基于Socket的数据源 socketTextStream从一个socket中创建一个包含了接收到的无线的字符串的数据流,接收到的字符串是被系统默认decode的。socket本身不会报告终止,结果就是只有socket 优雅终止时会启动重试。
socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
addSource 添加数据源
添加一个数据源到数据流的拓扑图中。
默认并发为1,如果想要并发执行,可以实现ParallelSourceFunction 或者继承RichParallelSourceFunction
function – the user defined function
sourceName – Name of the data source
typeInfo – the user defined type information for the stream
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
StreamSource<OUT, ?> sourceOperator;
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
任务运行
execute
触发程序执行。环境将会执行程序的sink算子的所有上下游。sink算子有两种,打印结果集或者发送结果集到消息队列中。
可以显示指定jobName,默认为“Flink Streaming Job”.
getStreamGraph获取流式任务的streamGraph。
- 取environment中的 状态后端,是否链式优化,环境上下文 作为参数构造一个流程图
- 将sinks算子之前的transformList进行迭代,在迭代中设置streamGraph的bufferTimeout、transofrmationUID、transformationUserHash、source
-
运行程序是创建一个计划,返回执行的data flow graph。
-
这个方法在计划执行之前,需要被调用。
-
返回JSON字符串。
cast2StoppableSourceFunction
clean返回给定函数的“closure cleaned”版本。仅当ExecutionConfig中未禁用闭包清理时才进行清理
addOperator添加一个算子到转换list中。List
并不一定是用户调用,创建算子的API方法必须调用这个方法。
registerCacheFile在分布式缓存中注册一个文件,这个文件在运行的时候将会被所有的用户自定义的function 作为一个本地localPath访问。
文件可以是本地文件,也可以是分布式文件。如果有必要,运行时将会临时拷贝文件到本地缓存中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)