StreamExecutionEnvironment是stream program执行的环境,其子类LocalStreamEnvironment会让程序在当前JJVM中执行,而子类RemoteStreamEnvironment会让程序在远程集群中执行。 ——Flink官方注释
首先,我们来看一看一个典型的Flink中的wordcount程序是什么样的。
public class WordCount { public static void main(String[] args) throws Exception { //定义socket的端口号 int port; try{ ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("没有指定port参数,使用默认值9000"); port = 9000; } //⭐获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //⭐连接socket获取输入的数据 DataStreamSourcetext = env.socketTextStream("10.192.12.106", port, "n"); //计算数据 DataStream windowCount = text.flatMap(new FlatMapFunction () { public void flatMap(String value, Collector out) throws Exception { String[] splits = value.split("\s"); for (String word:splits) { out.collect(new WordWithCount(word,1L)); } } })//打平 *** 作,把每行的单词转为 类型的数据 .keyBy("word")//针对相同的word数据进行分组 .timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小 .sum("count"); //把数据打印到控制台 windowCount.print() .setParallelism(1);//使用一个并行度 //⭐因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("streaming word count"); }
请注意看代码中⭐标记部分,可见我们Flink程序的第一步就是获得一个StreamExecutionEnviorment,然后通过SEE提供的方法添加source来获得第一个DataStream,最后提供SEE提供的execution( )方法执行程序,在这里SEE有三个作用。
在Flink官方注释中说道:“StreamExecutionEnviorment会提供方法来控制job的执行(比如设置并行度、容错和检查点方面的参数)以及与外界交换数据”。那么,SEE类具体是如何实现的呢,本文将从Flink源码出发,一步步学习SEE的组成。
从下面的代码中,我们首先看看SEE有哪些属性(英文已做翻译)
public static final String DEFAULT_JOB_NAME = "Flink Streaming Job"; private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L; private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); private final ExecutionConfig config = new ExecutionConfig(); private final CheckpointConfig checkpointCfg = new CheckpointConfig(); protected final List> transformations = new ArrayList<>(); private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT; protected boolean isChainingEnabled = true; private StateBackend defaultStateBackend; private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; protected final List > cacheFile = new ArrayList<>();
我们可以看到,除了一些常规的属性,还有两个重要属性:ExecutionConfig和CheckpointConfig (已由⭐标记出来)
SEE的属性组成大概可以归纳成下图
我们再来分别看看这两个属性里可以定义哪些参数,首先是ExecutionConfig
private static final long serialVersionUID = 1L; @Deprecated public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE; public static final int PARALLELISM_DEFAULT = -1; public static final int PARALLELISM_UNKNOWN = -2; private static final long DEFAULT_RESTART_DELAY = 10000L; // -------------------------------------------------------------------------------------------- private ExecutionMode executionMode = ExecutionMode.PIPELINED; private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE; private int parallelism = PARALLELISM_DEFAULT; private int maxParallelism = -1; @Deprecated private int numberOfExecutionRetries = -1; private boolean forceKryo = false; private boolean disableGenericTypes = false; private boolean objectReuse = false; private boolean autoTypeRegistrationEnabled = true; private boolean forceAvro = false; private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE; private boolean printProgressDuringExecution = true; private long autoWatermarkInterval = 0; private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue(); private boolean isLatencyTrackingConfigured = false; @Deprecated private long executionRetryDelay = DEFAULT_RESTART_DELAY; private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = new RestartStrategies.FallbackRestartStrategyConfiguration(); private long taskCancellationIntervalMillis = -1; private long taskCancellationTimeoutMillis = -1; private boolean useSnapshotCompression = false; private boolean failTaskonCheckpointError = true; private InputDependencyConstraint defaultInputDependencyConstraint = InputDependencyConstraint.ANY; // ------------------------------- User code values -------------------------------------------- private GlobalJobParameters globalJobParameters; // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo instances. private linkedHashMap, SerializableSerializer>> registeredTypesWithKryoSerializers = new linkedHashMap<>(); private linkedHashMap , Class extends Serializer>>> registeredTypesWithKryoSerializerClasses = new linkedHashMap<>(); private linkedHashMap , SerializableSerializer>> defaultKryoSerializers = new linkedHashMap<>(); private linkedHashMap , Class extends Serializer>>> defaultKryoSerializerClasses = new linkedHashMap<>(); private linkedHashSet > registeredKryoTypes = new linkedHashSet<>(); private linkedHashSet > registeredPojoTypes = new linkedHashSet<>();
以上可以看出,ExecutionConfig可以设置的内容包括 程序的默认并行度、执行失败时的重试次数、重启尝试之间的时间间隔、程序的执行模式—流或批、 启用或禁止“closure cleaner闭包清理器”、 是否允许register 类型和序列化器来提供处理基本类型和POJO类型的效率 等内容。
以下我们再看看checkpointConfig里有什么属性
private static final long serialVersionUID = -750378776078908147L; public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE; public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000; public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0; public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1; // ------------------------------------------------------------------------ private CheckpointingMode checkpointingMode = DEFAULT_MODE; private long checkpointInterval = -1; // disabled private long checkpointTimeout = DEFAULT_TIMEOUT; private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS; private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS; private boolean forceCheckpointing; private ExternalizedCheckpointCleanup externalizedCheckpointCleanup; private boolean failonCheckpointingErrors = true;
可见CheckpointConfig中定义的都是checkpoint相关的参数和策略。
由以上三个类的属性可见StreamExecutionEnviorment+ExecutionConfig+CheckpointConfig 定义了Flink最顶层的环境配置和策略选择。
我们再回过头来看看,SEE中定义了哪些重要方法?(各种get和set方法省略,注释大部分都翻译了,重点看⭐标注的地方)
首先我们看看SEE定义了哪些获得source的方法
public DataStreamSource⭐generateSequence(long from, long to) { if (from > to) { throw new IllegalArgumentException("Start of sequence must not be greater than the end"); } return addSource(new StatefulSequenceSource(from, to), "Sequence Source"); //⭐最后其实是使用了addSource方法 }
@SafeVarargs public finalDataStreamSource ⭐fromElements(OUT... data) { if (data.length == 0) { throw new IllegalArgumentException("fromElements needs at least one element as argument"); } TypeInformation typeInfo; try { typeInfo = TypeExtractor.getForObject(data[0]); } catch (Exception e) { throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + "; please specify the TypeInformation manually via " + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e); } //⭐最后实际使用了fromCollection方法 return fromCollection(Arrays.asList(data), typeInfo); }
publicDataStreamSource ⭐fromCollection(Collection data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "Collection must not be null"); // must not have null elements and mixed elements FromElementsFunction.checkCollection(data, typeInfo.getTypeClass()); SourceFunction function; try { function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } //⭐实际使用了addSource方法 return addSource(function, "Collection Source", typeInfo).setParallelism(1); }
privateDataStreamSource ⭐fromParallelCollection(SplittableIterator iterator, TypeInformation typeInfo, String operatorName) { //⭐实际上使用的是addsource return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo); }
@PublicEvolving public DataStreamSource⭐socketTextStream(String hostname, int port, String delimiter, long maxRetry) { //⭐最后使用的是addSource return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream"); }
public DataStreamSource⭐readTextFile(String filePath, String charsetName) { Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceonly(filePath), "The file path must not be null or blank."); TextInputFormat format = new TextInputFormat(new Path(filePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); //⭐最终执行的是readFile方法 return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
@PublicEvolving publicDataStreamSource ⭐readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation typeInformation) { Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceonly(filePath), "The file path must not be null or blank."); inputFormat.setFilePath(filePath); //⭐最后实际上使用的是createFileInput()方法 return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval); }
根据以上添加source的方法的调用关系,我们可以得到下面这张图
可以看到,最终是通过addSource和createFileInput两个方法创建了新的DataStream,那么我们再来看看这两个方法是怎么实现的吧
@SuppressWarnings("unchecked") publicDataStreamSource ⭐addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) { //⭐注意:typeInfo是可以为null的,可以从Sourcefunction提取type或者从SourceFunction创建type if (typeInfo == null) { if (function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable ) function).getProducedType(); } else { try { typeInfo = TypeExtractor.createTypeInfo( SourceFunction.class, function.getClass(), 0, null, null); } catch (final InvalidTypesException e) { typeInfo = (TypeInformation ) new MissingTypeInfo(sourceName, e); } } } boolean isParallel = function instanceof ParallelSourceFunction; clean(function); StreamSource sourceOperator; //⭐创建SourceOperator if (function instanceof StoppableFunction) { sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { sourceOperator = new StreamSource<>(function); } //⭐创建DataStream并返回(注:DataStreamSource是DataStream的子类) return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); }
privateDataStreamSource ⭐createFileInput(FileInputFormat inputFormat, TypeInformation typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval) { //⭐检查参数非空 Preconditions.checkNotNull(inputFormat, "Unspecified file input format."); Preconditions.checkNotNull(typeInfo, "Unspecified output type information."); Preconditions.checkNotNull(sourceName, "Unspecified name for the source."); Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode."); //⭐如果FileProcessingMode是PROCESS_CONTINUOUSLY(只进行一次全量文件导入)且 扫描间隔小于规定的最小interval则报错 //⭐Preconditions.checkArgument()会检查第一个参数是否为true,如果不是true则报错 Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) || interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, "The path monitoring interval cannot be less than " + ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms."); //⭐封装参数 ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(inputFormat); SingleOutputStreamOperator source = addSource(monitoringFunction, sourceName) .transform("Split Reader: " + sourceName, typeInfo, reader); //⭐创建和返回DataStream return new DataStreamSource<>(source); }
以上,我们介绍完了SEE中创建DataStream的方法,至于最后DataStream的具体实现请见下一篇文章
最后我们再来看看SEE的execute()方法,因为SEE抽象类中定义的execute()方法大多会被具体的子类override,所以我们不看SEE中的该方法,直接看子类的实现。以下我们以SEE的一个子类 LocalStreamEnvironment 来探究execute中做了什么。
@Override public JobExecutionResult execute(String jobName) throws Exception { // ⭐将流转化为JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); //⭐封装配置信息 Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // ⭐加载和封装user定义的设置参数 configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } //⭐设置每个task manager中的slot数量 int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); //⭐配置MiniCluster的参数 MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } //⭐创建miniCluster MiniCluster miniCluster = new MiniCluster(cfg); //⭐启动miniCluster,返回jobGraph的执行结果 try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); ⭐return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
由上面的源码可见,execute中执行的内容大致就是创建JobGraph、封装参数信息、创建cluster、把JobGraph提交到cluster等待返回执行结果,当然最后别忘了清理内存和关闭集群,也就是说在execute之前,都是客户端在进行flink程序的封装,只有通过execute()提交到cluster才是程序真正的执行。这也是flink程序懒加载的真相。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)