之前给大家介绍了DataStream API中 Environment 和 Transformation 连个体系的源代码,今天来了小插曲,给大家宏观介绍下 Flink 作业的提交流程,希望对大家有帮助。
一、DataStream 作业提交流程
1)、首先,先给大家展示下流程图:
2)、提交流程说明:
FlinkCli 先创建一个 Flink 环境变量
然后将环境变量存入到ThreadLocal中
在启动 Flink 作业jar包的 main 方法
Flink 应用程序通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取到相应的执行环境变量
Flink 应用程序将用户编写的作业转换成 jobGraph 提交给Flink 集群
3)、Flink 作业以哪种方式提交,取决于 StreamExecutionEnvironment 的配置信息;
起到主要作用的配置参数是 execution.target;
execution.target 取值:
remote
local
yarn-per-job
yarn-session
kubernetes-session
yarn-application
kubernetes-application
StreamExecutionEnvironment 会根据 execution.target 配置的不同取值创建相应的 PipelineExecutorFactory, 再由 PipelineExecutorFactory 创建相应的 PipelineExecutor, PipelineExecutor执行相应的作业提交工作;
源代码探究:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute()
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(String jobName)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamGraph streamGraph)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamGraph streamGraph)
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(final Configuration configuration) (见 代码 3-1)
ExecutorFactory 举例,org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory,(见代码 3-2)
代码 3-1
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
// 通过 java SPI 技术加载 实现了 PipelineExecutorFactory 接口的类
final ServiceLoader loader =
ServiceLoader.load(PipelineExecutorFactory.class);
final ListcompatibleFactories = new ArrayList<>(); final Iterator factories = loader.iterator(); while (factories.hasNext()) { try { final PipelineExecutorFactory factory = factories.next(); // 根据 execution.target 的取值 过滤出匹配到的 PipelineExecutorFactory if (factory != null && factory.isCompatibleWith(configuration)) { compatibleFactories.add(factory); } } catch (Throwable e) {} } if (compatibleFactories.size() > 1) {} if (compatibleFactories.isEmpty()) {} return compatibleFactories.get(0);
}
代码 3-2
@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {
@Override public boolean isCompatibleWith(@Nonnull final Configuration configuration) { return YarnSessionClusterExecutor.NAME.equalsIgnoreCase( configuration.get(DeploymentOptions.TARGET)); }
}
// 配置选项
public static final ConfigOption TARGET =
key(“execution.target”)
4)、FlinkCli 创建 Flink 环境变量相关流程:
org.apache.flink.client.cli.CliFrontend.main()
org.apache.flink.client.cli.CliFrontend.executeProgram()
org.apache.flink.client.ClientUtils.executeProgram()
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info( "Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout);
// 设置流环境变量
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try { // 启动用户程序的main方法 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); }
}
5)、StreamExecutionEnvironment.getExecutionEnvironment() 获取执行环境的逻辑:
先从 threadLocal 获取环境变量
如果 threadLocal 中没有相应的环境变量,则创建一个本地环境变量
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
public static Optional resolveFactory(ThreadLocal threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;
return Optional.ofNullable(factory);
}
二、Flink Table
1)、flink Sql 作业提交流程
2)、提交流程说明
TableEnvironmentImpl 在创建的过程中创建了 Executor , Executorbase 中包含了StreamExecutionEnvironment 的实例, StreamExecutionEnvironment 的实例由 StreamExecutionEnvironment .getExecutionEnvironment() 方法创建。
TableEnvironmentImpl 作业的提交依赖 StreamExecutionEnvironment 的作业提交流程。
TableEnvironmentImpl 借助Parser组件将 SQL 语句转换成 Operation,然后借助 Planner组件将Operation转换成 List。
使用StreamExecutionEnvironment 将 List 转换成 StreamGraph。
后续 *** 作与DataStream提交流程一样。
3)、 TableEnvironmentImpl .executeSql() 执行逻辑:
Sql 解析, 将Sql语句解析为 List 变量;
Transformation转换,将 List 转换为 List
PipeLine转换, 将List
4)、TableEnvironmentImpl 创建过程:
ModuleManager 的创建
CatalogManager 的创建
FunctionCatalog 的创建
Executor (执行环境)的创建, 先通过 java SPI 加载 Executor 工厂, 通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为 org.apache.flink.table.planner.delegation.BlinkExecutorFactory
Planner的创建(包括Parser的构造),先通过 java SPI 加载 Planner 工厂,通过EnvironmentSettings.Builder.useBlinkPlanner() 指定为org.apache.flink.table.planner.delegation.BlinkPlannerFactory
构造TableEnvironmentImpl
5)、Sql解析 (Blink Planner: StreamPlanner / BatchPlanner)
基本流程:
Sql语句解析成Sql 抽象语法树
Planner对sql 语法树进行验证
将验证过的语法树转换成关系代数树
将关系代数树封装成Flink对应的Operation
public List parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
Calsite :Sql 解析框架
SqlNode 代表Sql 抽象语法树中的节点,CalciteParser 内部使用 SqlParser 将Sql语句解析成Sql 抽象语法树。
Operation (Flink Table API中抽象出来的概念) 代表任意类型的Sql *** 作行为,例如 Select 、Insert、Drop 等sql *** 作可以表示为QueryOperation、CatalogSinkModifyOperation、DropOperation。FlinkPlannerImpl内部使用 Calsite 的 SqlToRelConverter 将验证后的抽象语法树转换成关系代数树。
6)、Operation 转换为 Transformation 逻辑 (Blink Planner : StreamPlanner / BatchPlanner)
基本流程:
从Operation中 获取到 关系代数树
根据优化规则优化关系代数树
生成物理执行计划
将物理执行计划转换成 List
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[]]
}
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)