Flink源码漫游指南<二>:全能管家StreamExecutionEnviornment

Flink源码漫游指南<二>:全能管家StreamExecutionEnviornment,第1张

Flink源码漫游指南<二>:全能管家StreamExecutionEnviornment

StreamExecutionEnvironment是stream program执行的环境,其子类LocalStreamEnvironment会让程序在当前JJVM中执行,而子类RemoteStreamEnvironment会让程序在远程集群中执行。  ——Flink官方注释

首先,我们来看一看一个典型的Flink中的wordcount程序是什么样的。

public class WordCount {

    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }

        //⭐获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //⭐连接socket获取输入的数据
        DataStreamSource text = env.socketTextStream("10.192.12.106", port, "n");

        //计算数据
        DataStream windowCount = text.flatMap(new FlatMapFunction() {
            public void flatMap(String value, Collector out) throws Exception {
                String[] splits = value.split("\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平 *** 作,把每行的单词转为类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");
               
        //把数据打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一个并行度
        //⭐因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");

    }

请注意看代码中⭐标记部分,可见我们Flink程序的第一步就是获得一个StreamExecutionEnviorment,然后通过SEE提供的方法添加source来获得第一个DataStream,最后提供SEE提供的execution( )方法执行程序,在这里SEE有三个作用。

在Flink官方注释中说道:“StreamExecutionEnviorment会提供方法来控制job的执行(比如设置并行度、容错和检查点方面的参数)以及与外界交换数据”。那么,SEE类具体是如何实现的呢,本文将从Flink源码出发,一步步学习SEE的组成。

从下面的代码中,我们首先看看SEE有哪些属性(英文已做翻译)

public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";


private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;


private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;


private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;


private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();



private final ExecutionConfig config = new ExecutionConfig();


private final CheckpointConfig checkpointCfg = new CheckpointConfig();


protected final List> transformations = new ArrayList<>();

private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;


protected boolean isChainingEnabled = true;


private StateBackend defaultStateBackend;


private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;


protected final List> cacheFile = new ArrayList<>();

我们可以看到,除了一些常规的属性,还有两个重要属性:ExecutionConfigCheckpointConfig (已由⭐标记出来)

SEE的属性组成大概可以归纳成下图

我们再来分别看看这两个属性里可以定义哪些参数,首先是ExecutionConfig

private static final long serialVersionUID = 1L;


@Deprecated
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;


public static final int PARALLELISM_DEFAULT = -1;


public static final int PARALLELISM_UNKNOWN = -2;

private static final long DEFAULT_RESTART_DELAY = 10000L;

// --------------------------------------------------------------------------------------------


private ExecutionMode executionMode = ExecutionMode.PIPELINED;


private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;

private int parallelism = PARALLELISM_DEFAULT;


private int maxParallelism = -1;


@Deprecated
private int numberOfExecutionRetries = -1;

private boolean forceKryo = false;


private boolean disableGenericTypes = false;

private boolean objectReuse = false;

private boolean autoTypeRegistrationEnabled = true;

private boolean forceAvro = false;

private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;


private boolean printProgressDuringExecution = true;

private long autoWatermarkInterval = 0;


private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();

private boolean isLatencyTrackingConfigured = false;


@Deprecated
private long executionRetryDelay = DEFAULT_RESTART_DELAY;

private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
   new RestartStrategies.FallbackRestartStrategyConfiguration();

private long taskCancellationIntervalMillis = -1;


private long taskCancellationTimeoutMillis = -1;


private boolean useSnapshotCompression = false;


private boolean failTaskonCheckpointError = true;


private InputDependencyConstraint defaultInputDependencyConstraint = InputDependencyConstraint.ANY;

// ------------------------------- User code values --------------------------------------------

private GlobalJobParameters globalJobParameters;

// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.

private linkedHashMap, SerializableSerializer> registeredTypesWithKryoSerializers = new linkedHashMap<>();

private linkedHashMap, Class>> registeredTypesWithKryoSerializerClasses = new linkedHashMap<>();

private linkedHashMap, SerializableSerializer> defaultKryoSerializers = new linkedHashMap<>();

private linkedHashMap, Class>> defaultKryoSerializerClasses = new linkedHashMap<>();

private linkedHashSet> registeredKryoTypes = new linkedHashSet<>();

private linkedHashSet> registeredPojoTypes = new linkedHashSet<>();

以上可以看出,ExecutionConfig可以设置的内容包括 程序的默认并行度、执行失败时的重试次数、重启尝试之间的时间间隔、程序的执行模式—流或批、 启用或禁止“closure cleaner闭包清理器”、 是否允许register 类型和序列化器来提供处理基本类型和POJO类型的效率 等内容。

以下我们再看看checkpointConfig里有什么属性

private static final long serialVersionUID = -750378776078908147L;


public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;


public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;


public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;


public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;

// ------------------------------------------------------------------------


private CheckpointingMode checkpointingMode = DEFAULT_MODE;


private long checkpointInterval = -1; // disabled


private long checkpointTimeout = DEFAULT_TIMEOUT;


private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;


private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;


private boolean forceCheckpointing;


private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;


private boolean failonCheckpointingErrors = true;

可见CheckpointConfig中定义的都是checkpoint相关的参数和策略。

由以上三个类的属性可见StreamExecutionEnviorment+ExecutionConfig+CheckpointConfig 定义了Flink最顶层的环境配置和策略选择。

我们再回过头来看看,SEE中定义了哪些重要方法?(各种get和set方法省略,注释大部分都翻译了,重点看⭐标注的地方)

首先我们看看SEE定义了哪些获得source的方法

public DataStreamSource ⭐generateSequence(long from, long to) {
   if (from > to) {
      throw new IllegalArgumentException("Start of sequence must not be greater than the end");
   }
   return addSource(new StatefulSequenceSource(from, to), "Sequence Source");
   //⭐最后其实是使用了addSource方法
}
@SafeVarargs
public final  DataStreamSource ⭐fromElements(OUT... data) {
   if (data.length == 0) {
      throw new IllegalArgumentException("fromElements needs at least one element as argument");
   }

   TypeInformation typeInfo;
   try {
      typeInfo = TypeExtractor.getForObject(data[0]);
   }
   catch (Exception e) {
      throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
            + "; please specify the TypeInformation manually via "
            + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
   }
   //⭐最后实际使用了fromCollection方法
   return fromCollection(Arrays.asList(data), typeInfo);
}
public  DataStreamSource ⭐fromCollection(Collection data, TypeInformation typeInfo) {
   Preconditions.checkNotNull(data, "Collection must not be null");

   // must not have null elements and mixed elements
   FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());

   SourceFunction function;
   try {
      function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
   }
   catch (IOException e) {
      throw new RuntimeException(e.getMessage(), e);
   }
   //⭐实际使用了addSource方法
   return addSource(function, "Collection Source", typeInfo).setParallelism(1);
}
private  DataStreamSource ⭐fromParallelCollection(SplittableIterator iterator, TypeInformation
			typeInfo, String operatorName) {
		//⭐实际上使用的是addsource
		return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
	}
@PublicEvolving
public DataStreamSource ⭐socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
   //⭐最后使用的是addSource
   return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
         "Socket Stream");
}
public DataStreamSource ⭐readTextFile(String filePath, String charsetName) {
   Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceonly(filePath), "The file path must not be null or blank.");

   TextInputFormat format = new TextInputFormat(new Path(filePath));
   format.setFilesFilter(FilePathFilter.createDefaultFilter());
   TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
   format.setCharsetName(charsetName);

   //⭐最终执行的是readFile方法
   return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
@PublicEvolving
public  DataStreamSource ⭐readFile(FileInputFormat inputFormat,
                                 String filePath,
                                 FileProcessingMode watchType,
                                 long interval,
                                 TypeInformation typeInformation) {

   Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
   Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceonly(filePath), "The file path must not be null or blank.");

   inputFormat.setFilePath(filePath);
   //⭐最后实际上使用的是createFileInput()方法
   return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
}

根据以上添加source的方法的调用关系,我们可以得到下面这张图

 可以看到,最终是通过addSource和createFileInput两个方法创建了新的DataStream,那么我们再来看看这两个方法是怎么实现的吧

@SuppressWarnings("unchecked")
public  DataStreamSource ⭐addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {

   //⭐注意:typeInfo是可以为null的,可以从Sourcefunction提取type或者从SourceFunction创建type
   if (typeInfo == null) {
      if (function instanceof ResultTypeQueryable) {
         typeInfo = ((ResultTypeQueryable) function).getProducedType();
      } else {
         try {
            typeInfo = TypeExtractor.createTypeInfo(
                  SourceFunction.class,
                  function.getClass(), 0, null, null);
         } catch (final InvalidTypesException e) {
            typeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e);
         }
      }
   }

   boolean isParallel = function instanceof ParallelSourceFunction;

   clean(function);
   StreamSource sourceOperator;
   //⭐创建SourceOperator
   if (function instanceof StoppableFunction) {
      sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
   } else {
      sourceOperator = new StreamSource<>(function);
   }

   //⭐创建DataStream并返回(注:DataStreamSource是DataStream的子类)
   return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
private  DataStreamSource ⭐createFileInput(FileInputFormat inputFormat,
                                       TypeInformation typeInfo,
                                       String sourceName,
                                       FileProcessingMode monitoringMode,
                                       long interval) {

   //⭐检查参数非空
   Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
   Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
   Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
   Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");

   //⭐如果FileProcessingMode是PROCESS_CONTINUOUSLY(只进行一次全量文件导入)且 扫描间隔小于规定的最小interval则报错
   //⭐Preconditions.checkArgument()会检查第一个参数是否为true,如果不是true则报错
   Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
         interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
      "The path monitoring interval cannot be less than " +
            ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");

   //⭐封装参数
   ContinuousFileMonitoringFunction monitoringFunction =
      new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);

   ContinuousFileReaderOperator reader =
      new ContinuousFileReaderOperator<>(inputFormat);

   SingleOutputStreamOperator source = addSource(monitoringFunction, sourceName)
         .transform("Split Reader: " + sourceName, typeInfo, reader);

   //⭐创建和返回DataStream
   return new DataStreamSource<>(source);
}

以上,我们介绍完了SEE中创建DataStream的方法,至于最后DataStream的具体实现请见下一篇文章

最后我们再来看看SEE的execute()方法,因为SEE抽象类中定义的execute()方法大多会被具体的子类override,所以我们不看SEE中的该方法,直接看子类的实现。以下我们以SEE的一个子类 LocalStreamEnvironment 来探究execute中做了什么。

@Override
public JobExecutionResult execute(String jobName) throws Exception {
   // ⭐将流转化为JobGraph
   StreamGraph streamGraph = getStreamGraph();
   streamGraph.setJobName(jobName);

   JobGraph jobGraph = streamGraph.getJobGraph();
   jobGraph.setAllowQueuedScheduling(true);

   //⭐封装配置信息
   Configuration configuration = new Configuration();
   configuration.addAll(jobGraph.getJobConfiguration());
   configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

   // ⭐加载和封装user定义的设置参数
   configuration.addAll(this.configuration);

   if (!configuration.contains(RestOptions.BIND_PORT)) {
      configuration.setString(RestOptions.BIND_PORT, "0");
   }

   //⭐设置每个task manager中的slot数量
   int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

   //⭐配置MiniCluster的参数
   MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
      .setConfiguration(configuration)
      .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
      .build();

   if (LOG.isInfoEnabled()) {
      LOG.info("Running job on local embedded Flink mini cluster");
   }

   //⭐创建miniCluster
   MiniCluster miniCluster = new MiniCluster(cfg);

   //⭐启动miniCluster,返回jobGraph的执行结果
   try {
      miniCluster.start();
      configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

      ⭐return miniCluster.executeJobBlocking(jobGraph);
   }
   finally {
      transformations.clear();
      miniCluster.close();
   }
}

由上面的源码可见,execute中执行的内容大致就是创建JobGraph、封装参数信息、创建cluster、把JobGraph提交到cluster等待返回执行结果,当然最后别忘了清理内存和关闭集群,也就是说在execute之前,都是客户端在进行flink程序的封装,只有通过execute()提交到cluster才是程序真正的执行。这也是flink程序懒加载的真相。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5135980.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存