Checkpoint执行过程分为:启动、执行以及确认完成三个阶段。
- CheckpointCoordinator控制Checkpoint执行:JM端的CheckpointCoordinator组件会周期性的向数据源发送执行CK的请求,数据源节点将数据源消费的offset发送给JM,存储到CK的元数据信息中。同时向下广播barrier。中间算子对齐barrier:中间算子在StreamTaskNetworkInput组件中读取数据并对齐各个channel的barrier。barrier对齐后,触发StreamTask的CK *** 作。将状态数据快照存储到外部持久化介质中,并向JM发送ack响应(会携带该task的状态信息)。CK完成后向Task发送通知:当JM接收到所有sink节点的ack消息后,JM确认本次CK *** 作完成(JM将CK元数据和算子状态序列化到远程持久化存储或内存之后),向所有Task实例发送本次CK完成的消息。
在执行Checkpoint的过程中,JM会对job中所有的快照进行统一协调和管理。在创建ExecutionGrap时会创建对应的组件。
在ExecutionGrap创建过程中会生成CompletedCheckpointStore、CheckpointStatsTracker 、CheckpointCoordinator组件用于监控和管理Job中的CK *** 作。
public class ExecutionGraphBuilder { public static ExecutionGraph buildGraph( @Nullable ExecutionGraph prior, JobGraph jobGraph, Configuration jobManagerConfig, ScheduledExecutorService futureExecutor, Executor ioExecutor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time rpcTimeout, RestartStrategy restartStrategy, MetricGroup metrics, BlobWriter blobWriter, Time allocationTimeout, Logger log, ShuffleMaster> shuffleMaster, JobMasterPartitionTracker partitionTracker, FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException { //。。。 JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings(); if (snapshotSettings != null) { //获取source节点,这些节点通过CheckpointCoordinator主动触发CK ListtriggerVertices = idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph); //ackVertices、/confirm/iVertices存储了StreamGrap的全部节点,所有节点都需要返回Ack确认信息并确认CK执行成功。 List ackVertices = idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph); List confirmVertices = idToVertex(snapshotSettings.getVerticesTo/confirm/i(), executionGraph); //存储CK的元数据信息 CompletedCheckpointStore completedCheckpoints; //通过counter保证只存储固定数量的CompletedCheckpoint CheckpointIDCounter checkpointIdCounter; //。。。 //用于监控和追踪CK执行和更新的情况,WebUI显示的CK数据主要就来自于该tracker CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( historySize, ackVertices, snapshotSettings.getCheckpointCoordinatorConfiguration(), metrics); //。。。 //在作业执行调度中开启CK,期间会创建CheckpointCoordinator组件 executionGraph.enableCheckpointing( chkConfig, triggerVertices, ackVertices, /confirm/iVertices, hooks, checkpointIdCounter, completedCheckpoints, rootBackend, checkpointStatsTracker); } } }
public void enableCheckpointing( CheckpointCoordinatorConfiguration chkConfig, ListCheckpoint触发过程verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, List > masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStatsTracker statsTracker) { //。。。 //timer定时器用于CheckpointCoordinator定时触发Source节点的CK *** 作 checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( new DispatcherThreadFactory( Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); // CK协调器,用于创建和保持检查点状态等功能,协调和管理Job中的Checkpoint checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), chkConfig, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointStateBackend, ioExecutor, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager); } //注册JobStatusListener监听器,当JobStatus变为running时,通过监听器启动CheckpointCoordinator if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); }
CK的触发过程有两种方式:一种时source算子通过CheckpointCoordinator组件进行协调和控制,CheckpointCoordinator通过定时器的方式定时触发source算子节点的CK *** 作。另一种是下游算子节点根据上游算子节点发送的barrier事件控制CK的触发时机。
CheckpointCoordinator触发算子Checkpoint *** 作CheckpointCoordinator负责Source算子节点CK *** 作以及整个作业的CK管理,并且CheckpointCoordinator组件会接收TaskManager在CK执行完成之后返回的Ack信息。
CheckpointCoordinator用过监听器启动,当JobStatus变为RUNNING状态时启动CheckpointCoordinator。
public class CheckpointCoordinatorDeActivator implements JobStatusListener { @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // 启动CK调度程序 coordinator.startCheckpointScheduler(); } else { // 停止CK调度 coordinator.stopCheckpointScheduler(); } } }
CheckpointCoordinator通过定时器周期性的触发ScheduledTrigger线程
public class CheckpointCoordinator { private final ScheduledExecutor timer; //通过timer定时器周期性触发ScheduledTrigger线程 private ScheduledFuture> scheduleTriggerWithDelay(long initDelay) { return timer.scheduleAtFixedRate( new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS); } private final class ScheduledTrigger implements Runnable { @Override public void run() { try { //调用triggerCheckpoint方法触发CK *** 作 triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } } } }
CheckpointCoordinator.triggerCheckpoint()方法的逻辑比较多,主要包括以下步骤:
- CK *** 作前的检查 *** 作:
检查CK的执行环节和参数、构建CK *** 作对应Task节点实例的Execution集合、构建需要发送Ack消息的ExecutionVertex集合。创建PendingCheckpoint
从开始执行CK *** 作直到所有Task实例返回Ack确认成功消息,CK会一直处于Pending状态,确保Ck能被成功执行。PendingCheckpoint存储了ID、ackTasks、快照存储位置等信息
//定义CK过程中状态快照数据存放位置 final CheckpointStorageLocation checkpointStorageLocation; final long checkpointID; try { // CK的唯一标记,HA集群会通过Zookeeper实现checkpointID计数 checkpointID = checkpointIdCounter.getAndIncrement(); checkpointStorageLocation = props.isSavepoint() ? checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : checkpointStorage.initializeLocationForCheckpoint(checkpointID); } catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).", job, numUnsuccessful, t); throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t); } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, masterHooks.keySet(), props, checkpointStorageLocation, executor);
- CK *** 作的触发和完成
会遍历所有Source算子的Execution节点,触发节点所在TaskExecutor的CK *** 作。
Execution[] executions = new Execution[tasksToTrigger.length]; //。。。 //触发所有Source算子的CK *** 作 for (Execution execution: executions) { if (props.isSynchronous()) { execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime); } else { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } } //返回CK中的CompletionFuture对象 numUnsuccessfulCheckpointsTriggers.set(0); return checkpoint.getCompletionFuture();
之后的会通过Execution的LogicalSlot拿到对于的TaskManagerGateway,然后通过TaskManagerGateway调用TaskExecutor.triggerCheckpoint()。
再从TaskExecutor的taskSlotTable中拿到对于的Task线程,最后调用StreamTask.triggerCheckpointAsync()方法执行CK *** 作。
通过Execution的Slot资源获取到TaskManger对应的网关,通过RPC调用触发对应Task的CK *** 作 public class Execution implements AccessExecution, Archiveable, LogicalSlot.Payload { private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { final CheckpointType checkpointType = checkpointOptions.getCheckpointType(); if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) { throw new IllegalArgumentException("only synchronous savepoints are allowed to advance the watermark to MAX."); } //获取当前Execution分配的LogicalSlot资源 final LogicalSlot slot = assignedResource; if (slot != null) { //通过LogicalSlot获取到TaskManager对应的网关 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); //RPC调用TaskManager触发对应Task的CK *** 作 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running."); } } }
StreamTask触发CK *** 作: public abstract class StreamTask对齐Barrier触发CK *** 作> extends AbstractInvokable implements AsyncExceptionHandler { @Override public Future triggerCheckpointAsync( CheckpointmetaData checkpointmetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { //将CK *** 作封装为Mail,发送到StreamTask的MailBox中进行调度 return mailboxProcessor.getMainMailboxExecutor().submit( () -> triggerCheckpoint(checkpointmetaData, checkpointOptions, advanceToEndOfEventTime), "checkpoint %s with %s", checkpointmetaData, checkpointOptions); }
StreamTask的Barrier对齐是通过CheckpointInputGate(封装的InputGate,具有barrier对齐功能)读取网络数据时触发的,这里过具体流程我们在上一节StreamTask数据流中已经介绍过了。
当所有channel的barrier对齐之后就会触发StreamTask.performCheckpoint()方法,生成当前Task的快照。
private boolean performCheckpoint( CheckpointmetaData checkpointmetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean advanceToEndOfTime) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointmetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); final long checkpointId = checkpointmetaData.getCheckpointId(); if (isRunning) { actionExecutor.runThrowing(() -> { if (checkpointOptions.getCheckpointType().isSynchronous()) { setSynchronousSavepointId(checkpointId); if (advanceToEndOfTime) { advanceToEndOfEventTime(); } } // 一下所有 *** 作应该是原子性的 // Step (1): 执行一些预屏障工作,一般是不执行或执行一些轻量级的工作 operatorChain.prepareSnapshotPreBarrier(checkpointId); // Step (2): 将barrier向下游广播出去 operatorChain.broadcastCheckpointBarrier( checkpointId, checkpointmetaData.getTimestamp(), checkpointOptions); // Step (3): 对所有算子进行快照 *** 作,该步骤是异步 *** 作,不影响数据流的正常处理 checkpointState(checkpointmetaData, checkpointOptions, checkpointMetrics); }); return true; } else { actionExecutor.runThrowing(() -> { // Task没有处于RUNNING状态,向下游广播CancelCheckpointMarker事件,取消此次CK final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointmetaData.getCheckpointId()); recordWriter.broadcastEvent(message); }); return false; } }
接下来我们讲解StreamTask执行快照 *** 作的具体过程。
- CheckpointingOperation 执行CK *** 作
public abstract class StreamTask> extends AbstractInvokable implements AsyncExceptionHandler { private void checkpointState( CheckpointmetaData checkpointmetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { //创建CheckpointStreamFactory实例,用于具体的状态存储, //有Memory和FS两种实现,分别支持内存和文件文件类型系统的数据流输出 CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointmetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); // CheckpointingOperation 封装了CK执行的具体 *** 作 CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointmetaData, checkpointOptions, storage, checkpointMetrics); //执行CK *** 作 checkpointingOperation.executeCheckpointing(); } }
private static final class CheckpointingOperation { public void executeCheckpointing() throws Exception { //对StreamTask的所有算子创建执行快照 *** 作的OperatorSnapshotFutures对象, //并将所有算子的快照 *** 作存储在operatorSnapshotsInProgress集合中 for (StreamOperator> op : allOperators) { checkpointStreamOperator(op); } //。。。 //AsyncCheckpointRunnable线程执行具体快照 *** 作 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, operatorSnapshotsInProgress, checkpointmetaData, checkpointMetrics, startAsyncPartNano); //通过StreamTask的asyncOperationsThreadPool线程池,异步执行operatorSnapshotsInProgress集合中所有算子的快照 *** 作 owner.cancelables.registerCloseable(asyncCheckpointRunnable); owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable); } private void checkpointStreamOperator(StreamOperator> op) throws Exception { if (null != op) { //将当前算子的快照 *** 作封装到OperatorSnapshotFutures中 OperatorSnapshotFutures snapshotInProgress = op.snapshotState( checkpointmetaData.getCheckpointId(), checkpointmetaData.getTimestamp(), checkpointOptions, storageLocation); operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } } }
- 将算子中的状态快照 *** 作封装到OperatorSnapshotFutures 中
从此处我们可以看出,原生状态和管理状态的状态生成过程不同。
(1)原生状态主要通过从snapshotContext中获取原生状态的快照 *** 作;
(2)管理状态主要通过operatorStateBackend&keyedStateBackend进行状态管理,并根据StateBackend的不同实现将状态写入内存或外部文件系统中。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { //1.如果有keyedStateBackend ,获取对于的KeyGroupRange KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; //2.OperatorSnapshotFutures 对象,封装当前算子的状态快照 *** 作 OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); //3.存储快照过程需要的上下文信息 StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables()); try { //执行快照 *** 作 snapshotState(snapshotContext); //设置KeyedStateRawFuture&OperatorStateRawFuture,用于处理原生数据快照 snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); //将operatorStateBackend&keyedStateBackend的状态快照方法注册到snapshotInProgress中,等待执行 if (null != operatorStateBackend) { //设置OperatorState快照的异步future snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { //设置KeyedState快照的异步future snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { //。。。 } //snapshotInProgress中封装了当前算子需要执行的所有快照 *** 作 return snapshotInProgress; }
- AsyncCheckpointRunnable 线程的定义和执行
所有的状态快照 *** 作都会被封装到OperatorSnapshotFutures对象中,最终通过AsyncCheckpointRunnable 线程触发执行。
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable { @Override public void run() { //1.为当前线程初始化文件系统安全网,确保数据正确写入 FileSystemSafetyNet.initializeSafetyNetForThread(); try { //发送给JM的CK数据 TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); //TaskExecutor本地的状态数据 TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(operatorSnapshotsInProgress.size()); //遍历获取StreamTask中所有算子的OperatorSnapshotFutures对象 for (Map.Entryentry : operatorSnapshotsInProgress.entrySet()) { OperatorID operatorID = entry.getKey(); OperatorSnapshotFutures snapshotInProgress = entry.getValue(); // 用于执行所有状态快照线程 *** 作,会执行KeyedState&OperatorState的快照 *** 作 OperatorSnapshotFinalizer finalizedSnapshots = new OperatorSnapshotFinalizer(snapshotInProgress); jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( operatorID, finalizedSnapshots.getJobManagerOwnedState()); localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( operatorID, finalizedSnapshots.getTaskLocalState()); } //。。。 if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.COMPLETED)) { //异步快照完成,向JM汇报CK的执行结果,并将状态发送给JM reportCompletedSnapshotStates( jobManagerTaskOperatorSubtaskStates, localTaskOperatorSubtaskStates, asyncDurationMillis); } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", owner.getName(), checkpointmetaData.getCheckpointId()); } } catch (Exception e) { //。。。 } } }
算子中托管状态主要借助KeyedStateBackend&OperatorStateBackend管理,两个状态后端都实现了SnapshotStrategy接口,提供了状态快照方法。
SnapshotStrategy根据不同的状态后端,主要分为HeapSnapshotStrategy和RocksDBSnapshotStrategy,其中RocksDBSnapshotStrategy又分为增量和全量两种子类实现。
- 发送AcknowledgeCheckpoint消息到CheckpointCoordinator中
在StreamTask中所有算子都完成状态数据的快照之后,Task实例会将TaskStateSnapshot消息发送给JM的CheckpointCoordinator,并在CheckpointCoordinator中完成后续 *** 作,例如确认接受到所有Task实例的Ack消息以及将当前的PendingCheckpoint转换为CompleteCheckpoint,并将CK元数据写到外部持久化文件系统中等 *** 作。
Checkpoint的确认过程主要如下:
(1)StreamTask中所有算子快照完成后,调用StreamTask.reportCompletedSnapshotStates方法将快照等信息发送给TaskStateManager;
(2)TaskStateManager通过CheckpointCoordinatorGateway将CK的Ack信息发送给CheckpointCoordinator;
(3)JobMaster收到Ack消息之后,调用SchedulerNG.acknowledgeCheckpoint方法将Ack消息封装为AcknowledgeCheckpoint对象,传递给CheckpointCoordinator;
(4)CheckpointCoordinator取出对于的PendingCheckpoint,判断是否所有Task实例都Ack消息都收到了,如果所有Task的Ack都已收到,则调用completePendingCheckpoint方法完成当前PendingCheckpoint *** 作;
(5)将PendingCheckpoint转化为CompleteCheckpoint,此时会将该CK的元数据和算子状态数据序列化到外部文件系统或内存中,并将CompleteCheckpoint添加到集合中;
(6)CheckpointCoordinator遍历所有Task对于的Execution节点,RPC调用ask实例的notifyCheckpointComplete方法。
CheckpointCoordinator受到所有Task实例的ACK响应后,会调用PendingCheckpoint.finalizeCheckpoint将PendingCheckpoint转化为CompleteCheckpoint,并将CK的状态数据写到外部文件系统中。
public CompletedCheckpoint finalizeCheckpoint() throws IOException { synchronized (lock) { try { // write out the metadata final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates); final CompletedCheckpointStorageLocation finalizedLocation; try (CheckpointmetadataOutputStream out = targetLocation.createmetadataOutputStream()) { Checkpoints.storeCheckpointmetadata(savepoint, out); finalizedLocation = out.closeAndFinalizeCheckpoint(); } CompletedCheckpoint completed = new CompletedCheckpoint( jobId, checkpointId, checkpointTimestamp, System.currentTimeMillis(), operatorStates, masterStates, props, finalizedLocation); return completed; } catch (Throwable t) { onCompletionPromise.completeExceptionally(t); ExceptionUtils.rethrowIOException(t); return null; // silence the compiler } } }
最终会调用SavepointV2Serializer 将状态序列化后,写到外部文件系统或内存中 public class SavepointV2Serializer implements SavepointSerializer{ @Override public void serialize(SavepointV2 checkpointmetadata, DataOutputStream dos) throws IOException { // first: checkpoint ID dos.writeLong(checkpointmetadata.getCheckpointId()); // second: master state final Collection masterStates = checkpointmetadata.getMasterStates(); dos.writeInt(masterStates.size()); for (MasterState ms : masterStates) { serializeMasterState(ms, dos); } // third: operator states Collection operatorStates = checkpointmetadata.getOperatorStates(); dos.writeInt(operatorStates.size()); for (OperatorState operatorState : operatorStates) { // Operator ID dos.writeLong(operatorState.getOperatorID().getLowerPart()); dos.writeLong(operatorState.getOperatorID().getUpperPart()); // Parallelism int parallelism = operatorState.getParallelism(); dos.writeInt(parallelism); dos.writeInt(operatorState.getMaxParallelism()); dos.writeInt(1); // Sub task states Map subtaskStateMap = operatorState.getSubtaskStates(); dos.writeInt(subtaskStateMap.size()); for (Map.Entry entry : subtaskStateMap.entrySet()) { dos.writeInt(entry.getKey()); serializeSubtaskState(entry.getValue(), dos); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)