Apache Flink源码解析-StreamOperator接口实现

Apache Flink源码解析-StreamOperator接口实现,第1张

Apache Flink源码解析-StreamOperator接口实现 1、StreamOperator接口实现

如图所示,StreamOperator的定义如下

图1- StreamOperator接口

StreamOperator接口主要包括如下核心方法

open():定义当前Operator的初始化方法,在数据元素正式接入Operator运算之前,Task会调用StreamOperator.open()方法对该算子进行初始化,具体open()方法的定义由子类实现,常见的用法如调用RichFunction中的open()方法创建相应的状态变量。close():当所有的数据元素都添加到当前Operator时,就会调用该方法刷新所有剩余的缓冲数据,保证算子中所有数据被正确处理。dispose():算子生命周期结束时会调用此方法,包括算子 *** 作执行成功、失败或者取消时。prepareSnapshotPreBarrier():在StreamOperator正式执行checkpoint *** 作之前会调用该方法,目前仅在MapBundleOperator算子中使用该方法。snapshotState():当SubTask执行checkpoint *** 作时会调用该方法,用于触发该Operator中状态数据的快照 *** 作。initializeState():当算子启动或重启时,调用该方法初始化状态数据,当恢复作业任务时,算子会从检查点(checkpoint)持久化的数据中恢复状态数据。 1.1 AbstractStreamOperator的基本实现

图 1.1-1 - AbstractStreamOperator抽象类

abstactStreamOperator作为StreamOperator的基本实现类,所有的Operator都会继承和实现该抽象实现类。在AbstractStreamOperator中定义了Operator用到的基础方法和成员信息

AbstractStreamOperator

 图 1.1-2 - AbstractStreamOperator核心属性

 
  protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

  
  private transient StreamTask container;

  
  protected transient StreamConfig config;

  
  protected transient Output> output;

  
  private transient StreamingRuntimeContext runtimeContext;

  
  private transient KeySelector stateKeySelector1;

  
  private transient KeySelector stateKeySelector2;

  private transient StreamOperatorStateHandler stateHandler;

  
  private transient InternalTimeServiceManager timeServiceManager;

  
  
  protected transient OperatorMetricGroup metrics;

  
  protected transient LatencyStats latencyStats;

  
  protected transient ProcessingTimeService processingTimeService;

  
  private long combinedWatermark = Long.MIN_VALUE;

  
  private long input1Watermark = Long.MIN_VALUE;

  
  private long input2Watermark = Long.MIN_VALUE;
1.2 AbstractUdfStreamOperator基本实现

当StreamOperator涉及自定义用户函数数据转换处理时,对应的Operator会继承AbstractUdfStreamOperator抽象实现类,常见的有StreamMap、CoProcessOperator等算子。

AbstractUdfStreamOperator继承自AbstractStreamOperator抽象类,对于AbstractUdfStreamOperator抽象类来讲,最重要的拓展就是增加了成员变量userFunction,且提供了userFunction初始化以及状态持久化的抽象方法。下面我们简单介绍AbstractUdfStreamOperator提供的主要方法。

 图 1.2-1 - AbstractUdfStreamOperator新增成员变量

  图 1.2-2- AbstractUdfStreamOperator类图概览

AbstractUdfStreamOperator核心方法介绍

@PublicEvolving
public abstract class AbstractUdfStreamOperator
        extends AbstractStreamOperator implements OutputTypeConfigurable {
  
  @Override
  public void setup(
      StreamTask containingTask, StreamConfig config, Output> output) {
    super.setup(containingTask, config, output);
    FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
  }

  
  @Override
  public void snapshotState(StateSnapshotContext context) throws Exception {
    super.snapshotState(context);
    StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
  }

  
  @Override
  public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    StreamingFunctionUtils.restoreFunctionState(context, userFunction);
  }

  
  @Override
  public void open() throws Exception {
    super.open();
    FunctionUtils.openFunction(userFunction, new Configuration());
  }

}

可以看出,当用户自定义实现Function时,在AbstractUdfStreamOperator抽象类中提供了对这些Function的初始化 *** 作,也就实现了Operator和Function之间的关联。Operator也是Function的载体,具体数据处理 *** 作借助Operator中的Function进行。StreamOperator提供了执行Function的环境,包括状态数据管理和处理Watermark、LatencyMarker等信息。

1.3 OneInputStreamOperator与TwoInputStreamOperator

StreamOperator根据输入流的数量分为两种类型,即支持单输入流的OneInputStreamOperator以及支持双输入流的TwoInputStreamOperator,我们可以将其称为一元输入算子和二元输入算子。下面介绍OneInputStreamOperator和TwoInputStreamOperator的区别。

图 1.3-1 - OneInputStreamOperator和TwoInputStreamOperator

1.3.1 OneInputStreamOperator的实现

OneInputStreamOperator定义了单输入流的StreamOperator,常见的实现类有StreamMap、StreamFilter等算子。OneInputStreamOperator接口主要包含以下方法,专门用于处理接入的单输入数据流

 图 1.3-2 - OneInputStreamOperator

 图 1.3-3 - OneInputStreamOperator

介绍StreamFilte这个算子来看OneInputStreamOperator这个接口的实现方式。

StreamFilter算子在继承AbstractUdfStreamOperator的同时,实现了OneInputStreamOperator接口。在StreamFilter算子构造器中,内部的Function类型为FilterFunction,并设定上下游算子的链接策略为ChainingStrategy.ALWAYS,也就是该类型的Operator通常都会与上下游的Operator连接在一起,形成OperatorChain。在StreamFilter中实现了OneInputStreamOperator的processElement()方法,通过该方法定义了具体的数据元素处理逻辑。实际上就是使用定义的filterFunction对接入的数据进行筛选,然后通过output.collect(element)方法将符合的条件输出到下游算子中。

  图 1.3-4 - StreamFilter

1.3.2 TwoInputStreamOperator的实现

TwoInputStreamOperator定义了双输入流类型的StreamOperator接口实现,常见的实现类有CoStreamMap、HashJoinOperator等算子

TwoInputStreamOperator接口定义

   图1.3 -5  TwoInputStreamOperator接口定义

TwoInputStreamOperator接口定义主要方法

    
    void processElement1(StreamRecord element) throws Exception;

    
    void processElement2(StreamRecord element) throws Exception;

    
    void processWatermark1(Watermark mark) throws Exception;

    
    void processWatermark2(Watermark mark) throws Exception;

    
    void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;

    
    void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;

以CoStreamMap为例,从CoStreamMap算子定义中可以看出,CoStreamMap继承AbstractUdfStreamOperator的同时,实现了TwoInputStreamOperator接口。其中在processElement1()和processElement2()两个方法的实现中,分别调用了用户定义的CoMapFunction的map1()和map2()方法对输入的数据元素Input1和Input2进行处理。经过函数处理后的结果会通过output.collect()接口推送到下游的Operator中。 

   图2 CoStreamMap类图 

 CoStreamMap的具体实现

@Internal
public class CoStreamMap
        extends AbstractUdfStreamOperator>
        implements TwoInputStreamOperator {

    private static final long serialVersionUID = 1L;

    public CoStreamMap(CoMapFunction mapper) {
        super(mapper);
    }

    @Override
    public void processElement1(StreamRecord element) throws Exception {
        output.collect(element.replace(userFunction.map1(element.getValue())));
    }

    @Override
    public void processElement2(StreamRecord element) throws Exception {
        output.collect(element.replace(userFunction.map2(element.getValue())));
    }
}
1.4 StreamOperatorFactory详解

StreamOperator最终会通过StreamOperatorFactory封装在Transformation结构中,并存储在StreamGraph和JobGraph结构中,直到运行时执行StreamTask时,才会调用StreamOperatorFactory.createStreamOperator()方法在StreamOperatorFactory中定义StreamOperator实例。

通过StreamOperatorFactory封装创建StreamOperator的 *** 作,在DataStreamAPI中主要通过SimpleOperatorFactory创建已经定义的Operator,而在Table API模块中主要通过CodeGenOperatorFactory从代码中动态编译并创建Operator实例。SimpleOperatorFactory和CodeGenOperatorFactory都是StreamOperatorFactory的实现类。

DataStream API中大部分转换 *** 作都是通过SimpleOperatorFactory进行封装和创建的。SimpleOperatorFactory根据算子类型的不同,拓展出了InputFormatOperatorFactory、UdfStreamOperatorFactory和OutputFormatOperatorFactory三种接口实现。

SimpleInputFormatOperatorFactory:支持创建InputFormat类型输入的StreamSource算子,即SourceFunction为InputFormatSourceFunction类型,并提供getInputFormat()方法生成StreamGraph。SimpleUdfStreamOperatorFactory:支持AbstractUdfStreamOperator类型的Operator创建,并且在SimpleUdfStreamOperatorFactory中提供了获取UserFunction的方法。SimpleOutputFormatOperatorFactory:支持创建OutputFormat类型输出的StreamSink算子,即SinkFunction为OutputFormatSinkFunction类型,并提供getOutputFormat()方法生成StreamGraph。

    图1.4-1 CoStreamMap类图 

 SimpleOperatorFactory

@Internal
public class SimpleOperatorFactory extends AbstractStreamOperatorFactory {

    private final StreamOperator operator;

    
    @SuppressWarnings("unchecked")
    public static  SimpleOperatorFactory of(StreamOperator operator) {
        if (operator == null) {
            return null;
        } else if (operator instanceof StreamSource
                && ((StreamSource) operator).getUserFunction()
                        instanceof InputFormatSourceFunction) {
            return new SimpleInputFormatOperatorFactory((StreamSource) operator);
        } else if (operator instanceof StreamSink
                && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
            return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
        } else if (operator instanceof AbstractUdfStreamOperator) {
            return new SimpleUdfStreamOperatorFactory((AbstractUdfStreamOperator) operator);
        } else {
            return new SimpleOperatorFactory<>(operator);
        }
    }
}

从SimpleOperatorFactory.of()方法定义中可以看出,基于StreamOperator提供的of()方法对算子进行工厂类的封装,实现将Operator封装在OperatorFactory中。然后根据Operator类型的不同,创建不同的SimpleOperatorFactory实现类,例如当Operator类型为StreamSource且UserFunction定义属于InputFormatSourceFunction时,就会创建SimpleInputFormatOperatorFactory实现类,其他情况类似。

@Internal
public class SimpleOperatorFactory extends AbstractStreamOperatorFactory {
  
//其他代码忽略
@SuppressWarnings("unchecked")
    @Override
    public > T createStreamOperator(
            StreamOperatorParameters parameters) {
        if (operator instanceof AbstractStreamOperator) {
            ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
        }
        if (operator instanceof SetupableStreamOperator) {
            ((SetupableStreamOperator) operator)
                    .setup(
                            parameters.getContainingTask(),
                            parameters.getStreamConfig(),
                            parameters.getOutput());
        }
        return (T) operator;
    }
}

在集群中执行该算子时,首先会调用SimpleOperatorFactory.createStreamOperator()方法创建StreamOperator实例。如果算子同时实现了SetupableStreamOperator接口,则会调用setup()方法对算子进行基本的设置。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717519.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存