如图所示,StreamOperator的定义如下
图1- StreamOperator接口
StreamOperator接口主要包括如下核心方法
open():定义当前Operator的初始化方法,在数据元素正式接入Operator运算之前,Task会调用StreamOperator.open()方法对该算子进行初始化,具体open()方法的定义由子类实现,常见的用法如调用RichFunction中的open()方法创建相应的状态变量。close():当所有的数据元素都添加到当前Operator时,就会调用该方法刷新所有剩余的缓冲数据,保证算子中所有数据被正确处理。dispose():算子生命周期结束时会调用此方法,包括算子 *** 作执行成功、失败或者取消时。prepareSnapshotPreBarrier():在StreamOperator正式执行checkpoint *** 作之前会调用该方法,目前仅在MapBundleOperator算子中使用该方法。snapshotState():当SubTask执行checkpoint *** 作时会调用该方法,用于触发该Operator中状态数据的快照 *** 作。initializeState():当算子启动或重启时,调用该方法初始化状态数据,当恢复作业任务时,算子会从检查点(checkpoint)持久化的数据中恢复状态数据。 1.1 AbstractStreamOperator的基本实现
图 1.1-1 - AbstractStreamOperator抽象类
abstactStreamOperator作为StreamOperator的基本实现类,所有的Operator都会继承和实现该抽象实现类。在AbstractStreamOperator中定义了Operator用到的基础方法和成员信息
AbstractStreamOperator
图 1.1-2 - AbstractStreamOperator核心属性
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; private transient StreamTask, ?> container; protected transient StreamConfig config; protected transient Output1.2 AbstractUdfStreamOperator基本实现> output; private transient StreamingRuntimeContext runtimeContext; private transient KeySelector, ?> stateKeySelector1; private transient KeySelector, ?> stateKeySelector2; private transient StreamOperatorStateHandler stateHandler; private transient InternalTimeServiceManager> timeServiceManager; protected transient OperatorMetricGroup metrics; protected transient LatencyStats latencyStats; protected transient ProcessingTimeService processingTimeService; private long combinedWatermark = Long.MIN_VALUE; private long input1Watermark = Long.MIN_VALUE; private long input2Watermark = Long.MIN_VALUE;
当StreamOperator涉及自定义用户函数数据转换处理时,对应的Operator会继承AbstractUdfStreamOperator抽象实现类,常见的有StreamMap、CoProcessOperator等算子。
AbstractUdfStreamOperator继承自AbstractStreamOperator抽象类,对于AbstractUdfStreamOperator抽象类来讲,最重要的拓展就是增加了成员变量userFunction,且提供了userFunction初始化以及状态持久化的抽象方法。下面我们简单介绍AbstractUdfStreamOperator提供的主要方法。
图 1.2-1 - AbstractUdfStreamOperator新增成员变量
图 1.2-2- AbstractUdfStreamOperator类图概览
AbstractUdfStreamOperator核心方法介绍
@PublicEvolving public abstract class AbstractUdfStreamOperatorextends AbstractStreamOperator implements OutputTypeConfigurable { @Override public void setup( StreamTask, ?> containingTask, StreamConfig config, Output > output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction); } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); } @Override public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); } }
可以看出,当用户自定义实现Function时,在AbstractUdfStreamOperator抽象类中提供了对这些Function的初始化 *** 作,也就实现了Operator和Function之间的关联。Operator也是Function的载体,具体数据处理 *** 作借助Operator中的Function进行。StreamOperator提供了执行Function的环境,包括状态数据管理和处理Watermark、LatencyMarker等信息。
1.3 OneInputStreamOperator与TwoInputStreamOperatorStreamOperator根据输入流的数量分为两种类型,即支持单输入流的OneInputStreamOperator以及支持双输入流的TwoInputStreamOperator,我们可以将其称为一元输入算子和二元输入算子。下面介绍OneInputStreamOperator和TwoInputStreamOperator的区别。
图 1.3-1 - OneInputStreamOperator和TwoInputStreamOperator
1.3.1 OneInputStreamOperator的实现OneInputStreamOperator定义了单输入流的StreamOperator,常见的实现类有StreamMap、StreamFilter等算子。OneInputStreamOperator接口主要包含以下方法,专门用于处理接入的单输入数据流
图 1.3-2 - OneInputStreamOperator
图 1.3-3 - OneInputStreamOperator
介绍StreamFilte这个算子来看OneInputStreamOperator这个接口的实现方式。
StreamFilter算子在继承AbstractUdfStreamOperator的同时,实现了OneInputStreamOperator接口。在StreamFilter算子构造器中,内部的Function类型为FilterFunction,并设定上下游算子的链接策略为ChainingStrategy.ALWAYS,也就是该类型的Operator通常都会与上下游的Operator连接在一起,形成OperatorChain。在StreamFilter中实现了OneInputStreamOperator的processElement()方法,通过该方法定义了具体的数据元素处理逻辑。实际上就是使用定义的filterFunction对接入的数据进行筛选,然后通过output.collect(element)方法将符合的条件输出到下游算子中。
图 1.3-4 - StreamFilter
1.3.2 TwoInputStreamOperator的实现TwoInputStreamOperator定义了双输入流类型的StreamOperator接口实现,常见的实现类有CoStreamMap、HashJoinOperator等算子
TwoInputStreamOperator接口定义
图1.3 -5 TwoInputStreamOperator接口定义
TwoInputStreamOperator接口定义主要方法
void processElement1(StreamRecordelement) throws Exception; void processElement2(StreamRecord element) throws Exception; void processWatermark1(Watermark mark) throws Exception; void processWatermark2(Watermark mark) throws Exception; void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception; void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
以CoStreamMap为例,从CoStreamMap算子定义中可以看出,CoStreamMap继承AbstractUdfStreamOperator的同时,实现了TwoInputStreamOperator接口。其中在processElement1()和processElement2()两个方法的实现中,分别调用了用户定义的CoMapFunction的map1()和map2()方法对输入的数据元素Input1和Input2进行处理。经过函数处理后的结果会通过output.collect()接口推送到下游的Operator中。
图2 CoStreamMap类图
CoStreamMap的具体实现
@Internal public class CoStreamMap1.4 StreamOperatorFactory详解extends AbstractUdfStreamOperator > implements TwoInputStreamOperator { private static final long serialVersionUID = 1L; public CoStreamMap(CoMapFunction mapper) { super(mapper); } @Override public void processElement1(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map1(element.getValue()))); } @Override public void processElement2(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map2(element.getValue()))); } }
StreamOperator最终会通过StreamOperatorFactory封装在Transformation结构中,并存储在StreamGraph和JobGraph结构中,直到运行时执行StreamTask时,才会调用StreamOperatorFactory.createStreamOperator()方法在StreamOperatorFactory中定义StreamOperator实例。
通过StreamOperatorFactory封装创建StreamOperator的 *** 作,在DataStreamAPI中主要通过SimpleOperatorFactory创建已经定义的Operator,而在Table API模块中主要通过CodeGenOperatorFactory从代码中动态编译并创建Operator实例。SimpleOperatorFactory和CodeGenOperatorFactory都是StreamOperatorFactory的实现类。
DataStream API中大部分转换 *** 作都是通过SimpleOperatorFactory进行封装和创建的。SimpleOperatorFactory根据算子类型的不同,拓展出了InputFormatOperatorFactory、UdfStreamOperatorFactory和OutputFormatOperatorFactory三种接口实现。
SimpleInputFormatOperatorFactory:支持创建InputFormat类型输入的StreamSource算子,即SourceFunction为InputFormatSourceFunction类型,并提供getInputFormat()方法生成StreamGraph。SimpleUdfStreamOperatorFactory:支持AbstractUdfStreamOperator类型的Operator创建,并且在SimpleUdfStreamOperatorFactory中提供了获取UserFunction的方法。SimpleOutputFormatOperatorFactory:支持创建OutputFormat类型输出的StreamSink算子,即SinkFunction为OutputFormatSinkFunction类型,并提供getOutputFormat()方法生成StreamGraph。
图1.4-1 CoStreamMap类图
SimpleOperatorFactory
@Internal public class SimpleOperatorFactoryextends AbstractStreamOperatorFactory { private final StreamOperator operator; @SuppressWarnings("unchecked") public static SimpleOperatorFactory of(StreamOperator operator) { if (operator == null) { return null; } else if (operator instanceof StreamSource && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) { return new SimpleInputFormatOperatorFactory ((StreamSource) operator); } else if (operator instanceof StreamSink && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) { return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator); } else if (operator instanceof AbstractUdfStreamOperator) { return new SimpleUdfStreamOperatorFactory ((AbstractUdfStreamOperator) operator); } else { return new SimpleOperatorFactory<>(operator); } } }
从SimpleOperatorFactory.of()方法定义中可以看出,基于StreamOperator提供的of()方法对算子进行工厂类的封装,实现将Operator封装在OperatorFactory中。然后根据Operator类型的不同,创建不同的SimpleOperatorFactory实现类,例如当Operator类型为StreamSource且UserFunction定义属于InputFormatSourceFunction时,就会创建SimpleInputFormatOperatorFactory实现类,其他情况类似。
@Internal public class SimpleOperatorFactoryextends AbstractStreamOperatorFactory { //其他代码忽略 @SuppressWarnings("unchecked") @Override public > T createStreamOperator( StreamOperatorParameters parameters) { if (operator instanceof AbstractStreamOperator) { ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService); } if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator) .setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } return (T) operator; } }
在集群中执行该算子时,首先会调用SimpleOperatorFactory.createStreamOperator()方法创建StreamOperator实例。如果算子同时实现了SetupableStreamOperator接口,则会调用setup()方法对算子进行基本的设置。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)