上文我们讲解了客户端将用户代码最终转化为JobGrap之后,通过Dispatcher的网关将JobGrap提交给Dispatcher。之后Dispatcher通过JobManagerRunnerFactory工厂类创建JobManagerRunner实例,最终调用JobManagerRunner实例启动JobManager服务。JobManager服务的底层主要通过JobMaster实现的,负责整个作业的生命周期和Task调度工作。
JobGrap提交整体流程- flink客户端通过ClusterClient调用Dispatcher.submit(JobGrap)的RPC方法,将创建好的JobGrap提交给Dispatcher。集群运行时调用Dispatcher.internalSubmitJob()方法执行JobGraph,然后调用waitForTerminatingJobManager()方法检查该JobGraph是否已经创建过,并调用persistAndRunJob()方法,对JobGraph持久化并执行。之后调用runJob(JobGraph)方法,通过JobManagerRunnerFactory创建JobManagerRunner,并调用JobManagerRunner.start()方法启动JobManagerRunner。由于JobManagerRunner实现了高可用,因此,需要leaderElectionService.start(this)选主,给予当前JobManagerRunner领导权;当前JobManagerRunner获取到leader权限后,会回调JobManagerRunner.grantLeaderShip(UUID)方法,然后调用startJobMaster(UUID)方法,通过JobMasterService.start(JobMasterId)启动JobMaster,此时JobMaster的RPC服务启动完成并对外提供服务JobMaster启动完成后,会异步调用JobMaster.startJobExecution()方法开始JobGraph的调度运行。作业如果成功,会将Acknowledge信息范湖给Dispatcher服务。Dispatcher将Acknowledge信息返回给ClusterClient,该消息最终返回给客户端。
Dispatcher会对JobGraph进行持久化并创建对应的JobManagerRunner,通过JobManagerRunner对象启动JobMaster,执行任务
public abstract class Dispatcher extends PermanentlyFencedRpcEndpointimplements DispatcherGateway { //Step1:CLusterClient会调用Dispatcher的RPC方法,上传JobGraph public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName()); try { if (isDuplicateJob(jobGraph.getJobID())) { return FutureUtils.completedExceptionally( new DuplicateJobSubmissionException(jobGraph.getJobID())); } else if (isPartialResourceConfigured(jobGraph)) { return FutureUtils.completedExceptionally( new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " + "resources configured. The limitation will be removed in future versions.")); } else { //进入具体JobGraph提交的逻辑 return internalSubmitJob(jobGraph); } } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } } //Step2 private CompletableFuture internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); //会检查该JobGraph是否提交过,并异步persistAndRunJob方法持久化并执行JobGraph final CompletableFuture persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .thenApply(ignored -> Acknowledge.get()); //... //最终会返回一个ACK的Future对象 } //Step3:持久化并执行JobGraph private CompletableFuture persistAndRunJob(JobGraph jobGraph) throws Exception { //持久化JobGraph,如果是Zookeeper实现的高可用,则通过ZooKeeperJobGraphStore组件将JobGraph写到ZK上 //异常情况下通过ZooKeeperJobGraphStore组件恢复作业 jobGraphWriter.putJobGraph(jobGraph); //执行对应的JobGraph final CompletableFuture runJobFuture = runJob(jobGraph); //对runJobFuture 执行完毕后的 *** 作并处理异常 return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> { if (throwable != null) { jobGraphWriter.removeJobGraph(jobGraph.getJobID()); } })); } //Step4:创建并启动对应的JobManagerRunner,并在startJobManagerRunner()中调用JobManagerRunner.start()方法启动JobManagerRunner private CompletableFuture runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); //JobManagerRunnerFactory创建对应的jobManagerRunner final CompletableFuture jobManagerRunnerFuture = createJobManagerRunner(jobGraph); //将JobManagerRunner的异步对象添加到集合中,用于避免重复创建Job jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); //异步启动JobManagerRunner,并处理异常 return jobManagerRunnerFuture .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)) .thenApply(FunctionUtils.nullFn()) .whenCompleteAsync( (ignored, throwable) -> { if (throwable != null) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); } }
JobManagerRunner根据具体HA策略进行选主,然后启动JobMaster,通过JobMaster调度并执行Job
public class JobManagerRunnerImpl implements LeaderContender, OnCompletionActions, JobManagerRunner { //Step1:HA,选主,选取成功后,回调grantLeadership()方法 @Override public void start() throws Exception { try { leaderElectionService.start(this); } catch (Exception e) { log.error("Could not start the JobManager because the leader election service did not start.", e); throw new Exception("Could not start the leader election service.", e); } } //Step2:选主成功后,检验Job状态,然后调用startJobMaster()方法启动JobMaster @Override public void grantLeadership(final UUID leaderSessionID) { synchronized (lock) { if (shutdown) { log.info("JobManagerRunner already shutdown."); return; } leadershipOperation = leadershipOperation.thenCompose( (ignored) -> { synchronized (lock) { return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } }); handleException(leadershipOperation, "Could not start the job manager."); } } //Step3:启动JobMaster,调度并执行Job private CompletionStagestartJobMaster(UUID leaderSessionId) { log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress()); try { //注册Job信息 runningJobsRegistry.setJobRunning(jobGraph.getJobID()); } catch (IOException e) { return FutureUtils.completedExceptionally( new FlinkException( String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()), e)); } final CompletableFuture startFuture; try { //启动JobMaster,并调度执行整个JobGraph startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); } catch (Exception e) { return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e)); } //异步确认当前JobManagerRunner具有LeaderShip final CompletableFuture currentLeaderGatewayFuture = leaderGatewayFuture; return startFuture.thenAcceptAsync( (Acknowledge ack) -> /confirm/iLeaderSessionIdIfStillLeader( leaderSessionId, jobMasterService.getAddress(), currentLeaderGatewayFuture), executor); } }
JobMaster启动后建立和RM的心跳,并启动SlotPool服务,创建调度器(创建过程中将JobGrap装换为ExecutionGraph),通过调度器调度Job
public class JobMaster extends FencedRpcEndpointimplements JobMasterGateway, JobMasterService { //Step1 public CompletableFuture start(final JobMasterId newJobMasterId) throws Exception { // 启动JobMaster的RPC服务 start(); //启动JobMaster并通过JobMaster执行分配的Job return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT); } //Step2 private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { //检测当前RPC主线程是否正常运行,如未正常运行则抛出异常 validateRunsInMainThread(); checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); //如果Fence Token已包含新的JobMasterId,则表明Job已通过该JobMasterId启动,返回Ack if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); return Acknowledge.get(); } //为JobMasterId设置新的Fencing Token,用于RPC通信 setNewFencingToken(newJobMasterId); //建立TM与RM之间的心态,并启动SlotPool startJobMasterServices(); log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); //为Job分配调度器,调度执行Task resetAndStartScheduler(); return Acknowledge.get(); } //Step3:创建调度器,并调度执行Job private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); //用于调度器的异步分配 *** 作 final CompletableFuture schedulerAssignedFuture; if (schedulerNG.requestJobStatus() == JobStatus.CREATED) { //Job为新建状态,将MainThreadExecutor分配给schedulerNG调度器 schedulerAssignedFuture = CompletableFuture.completedFuture(null); schedulerNG.setMainThreadExecutor(getMainThreadExecutor()); } else { //重启任务时,需要重置ExecutionGraph,并创建新的调度器给当前Job suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); //创建调度器,并将JobGraph转换为ExecutionGraph final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup); schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle( (ignored, throwable) -> { newScheduler.setMainThreadExecutor(getMainThreadExecutor()); assignScheduler(newScheduler, newJobManagerJobMetricGroup); return null; } ); } //调度器调度执行Job,分别执行ExecutionGraph中的节点 schedulerAssignedFuture.thenRun(this::startScheduling); } //通过调度器,调度执行Job private void startScheduling() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); schedulerNG.registerJobStatusListener(jobStatusListener); schedulerNG.startScheduling(); } }
ExecutionGrap的调度执行
public class DefaultScheduler extends Schedulerbase implements SchedulerOperations { //Step1 @Override public final void startScheduling() { mainThreadExecutor.assertRunningInMainThread(); registerJobMetrics(); startSchedulingInternal(); } //Step2 @Override protected void startSchedulingInternal() { log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName()); prepareExecutionGraphForNgScheduling(); //根据具体调度策略调度Job //eager:用于流计算,一次申请所有资源,资源不足则失败 //lazy:用于批计算,从SourceTask分阶段调度 schedulingStrategy.startScheduling(); } }
public class EagerSchedulingStrategy implements SchedulingStrategy { @Override public void startScheduling() { allocateSlotsAndDeploy(SchedulingStrategyUtils.getAllVertexIdsFromTopology(schedulingTopology)); } private void allocateSlotsAndDeploy(final SetverticesToDeploy) { //将需要部署执行的ExecutionVertexID集合转换成ExecutionVertexDeploymentOption集合 final List executionVertexDeploymentOptions = SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder( schedulingTopology, verticesToDeploy, id -> deploymentOption); //通过DefaultScheduler.allocateSlotsAndDeploy(),部署ExecutionVertex schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); } }
DefaultScheduler最终会调用deployTaskSafe()方法部署指定ExecutionVertexID的ExecutionVertex。
public class DefaultScheduler extends Schedulerbase implements SchedulerOperations { private void deployTaskSafe(final ExecutionVertexID executionVertexId) { try { //获取ExecutionVertex节点 final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); //通过executionVertexOperations部署获取到的executionVertex executionVertexOperations.deploy(executionVertex); } catch (Throwable e) { handleTaskDeploymentFailure(executionVertexId, e); } } }
executionVertexOperations的deploy方法,最终实际调用的是ExecutionVertex.deploy()方法,将ExecutionVertex节点的Task部署和运行到执行的TaskExecution中。
每个ExecutionVertex节点都有一个Execution对象,通过调用Execution.deploy()方法完成Execution的部署和运行。
public class ExecutionVertex implements AccessExecutionVertex, Archiveable { public void deploy() throws JobException { currentExecution.deploy(); } }
每个Execution 对象都分配了一个LogicSlot,通过LogicSlot可以拿到对于TM的网关对象,然后RPC调用TaskExecutor的submitTask方法,提交任务
public class Execution implements AccessExecution, Archiveable, LogicalSlot.Payload { public void deploy() throws JobException { assertRunningInJobMasterMainThread(); //该Execution 以及分配的Slot资源 final LogicalSlot slot = assignedResource; checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); // 判断当前Slot对于的TM是否存活 if (!slot.isAlive()) { throw new JobException("Target slot (TaskManager) for deployment is no longer alive."); } // 判断当前Execution的状态 ExecutionState previous = this.state; if (previous == SCHEDULED || previous == CREATED) { if (!transitionState(previous, DEPLOYING)) { throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race."); } } else { // 当前Execution已经被取消或执行 throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous); } //判断当前Execution分配的SLot资源的有效性 if (this != slot.getPayload()) { throw new IllegalStateException( String.format("The execution %s has not been assigned to the assigned slot.", this)); } try { // 当前Execution的状态不是DEPLOYING,则释放资源 if (this.state != DEPLOYING) { slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING.")); return; } if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation())); } //创建TaskDeploymentDescriptor 用于将Task部署到TM上 final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory .fromExecutionVertex(vertex, attemptNumber) .createDeploymentDescriptor( slot.getAllocationId(), slot.getPhysicalSlotNumber(), taskRestore, producedPartitions.values()); // 将taskRestore 置空,以进行GC回收 taskRestore = null; // 获取Slot对于的TM的网关 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); // 通过网关RPC调用方TM的submitTask方法,将Task部署到TM上分 CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, jobMasterMainThreadExecutor); } catch (Throwable t) { markFailed(t); if (isLegacyScheduling()) { ExceptionUtils.rethrow(t); } } } }
完成上述步骤后,TM接受到JM提交的TaskDeploymentDescriptor 信息,完成Task线程的构建并启动运行。当Job所有的Task实例全部启动后,系统就可以正常处理接入的数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)