Flink Checkpoint机制剖析(源码剖析)

Flink Checkpoint机制剖析(源码剖析),第1张

Flink Checkpoint机制剖析(源码剖析) Checkpoint整体设计

Checkpoint执行过程分为:启动、执行以及确认完成三个阶段。

    CheckpointCoordinator控制Checkpoint执行:JM端的CheckpointCoordinator组件会周期性的向数据源发送执行CK的请求,数据源节点将数据源消费的offset发送给JM,存储到CK的元数据信息中。同时向下广播barrier。中间算子对齐barrier:中间算子在StreamTaskNetworkInput组件中读取数据并对齐各个channel的barrier。barrier对齐后,触发StreamTask的CK *** 作。将状态数据快照存储到外部持久化介质中,并向JM发送ack响应(会携带该task的状态信息)。CK完成后向Task发送通知:当JM接收到所有sink节点的ack消息后,JM确认本次CK *** 作完成(JM将CK元数据和算子状态序列化到远程持久化存储或内存之后),向所有Task实例发送本次CK完成的消息。

在执行Checkpoint的过程中,JM会对job中所有的快照进行统一协调和管理。在创建ExecutionGrap时会创建对应的组件。
在ExecutionGrap创建过程中会生成CompletedCheckpointStore、CheckpointStatsTracker 、CheckpointCoordinator组件用于监控和管理Job中的CK *** 作

public class ExecutionGraphBuilder {
	public static ExecutionGraph buildGraph(
		@Nullable ExecutionGraph prior,
		JobGraph jobGraph,
		Configuration jobManagerConfig,
		ScheduledExecutorService futureExecutor,
		Executor ioExecutor,
		SlotProvider slotProvider,
		ClassLoader classLoader,
		CheckpointRecoveryFactory recoveryFactory,
		Time rpcTimeout,
		RestartStrategy restartStrategy,
		MetricGroup metrics,
		BlobWriter blobWriter,
		Time allocationTimeout,
		Logger log,
		ShuffleMaster shuffleMaster,
		JobMasterPartitionTracker partitionTracker,
		FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {
		//。。。
		JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
		if (snapshotSettings != null) {
			//获取source节点,这些节点通过CheckpointCoordinator主动触发CK
			List triggerVertices =
					idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
			//ackVertices、/confirm/iVertices存储了StreamGrap的全部节点,所有节点都需要返回Ack确认信息并确认CK执行成功。
			List ackVertices =
					idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);

			List confirmVertices =
					idToVertex(snapshotSettings.getVerticesTo/confirm/i(), executionGraph);
			//存储CK的元数据信息
			CompletedCheckpointStore completedCheckpoints;
			//通过counter保证只存储固定数量的CompletedCheckpoint
			CheckpointIDCounter checkpointIdCounter;
			//。。。
			//用于监控和追踪CK执行和更新的情况,WebUI显示的CK数据主要就来自于该tracker
			CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
					historySize,
					ackVertices,
					snapshotSettings.getCheckpointCoordinatorConfiguration(),
					metrics);
			//。。。
			//在作业执行调度中开启CK,期间会创建CheckpointCoordinator组件
			executionGraph.enableCheckpointing(
				chkConfig,
				triggerVertices,
				ackVertices,
				/confirm/iVertices,
				hooks,
				checkpointIdCounter,
				completedCheckpoints,
				rootBackend,
				checkpointStatsTracker);
		}
	}
}
	public void enableCheckpointing(
			CheckpointCoordinatorConfiguration chkConfig,
			List verticesToTrigger,
			List verticesToWaitFor,
			List verticesToCommitTo,
			List> masterHooks,
			CheckpointIDCounter checkpointIDCounter,
			CompletedCheckpointStore checkpointStore,
			StateBackend checkpointStateBackend,
			CheckpointStatsTracker statsTracker) {
			//。。。

			//timer定时器用于CheckpointCoordinator定时触发Source节点的CK *** 作
			checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(
			new DispatcherThreadFactory(
				Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));

		// CK协调器,用于创建和保持检查点状态等功能,协调和管理Job中的Checkpoint
		checkpointCoordinator = new CheckpointCoordinator(
			jobInformation.getJobId(),
			chkConfig,
			tasksToTrigger,
			tasksToWaitFor,
			tasksToCommitTo,
			checkpointIDCounter,
			checkpointStore,
			checkpointStateBackend,
			ioExecutor,
			new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
			SharedStateRegistry.DEFAULT_FACTORY,
			failureManager);
	}
	//注册JobStatusListener监听器,当JobStatus变为running时,通过监听器启动CheckpointCoordinator
	if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
			registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
		}
Checkpoint触发过程

CK的触发过程有两种方式:一种时source算子通过CheckpointCoordinator组件进行协调和控制,CheckpointCoordinator通过定时器的方式定时触发source算子节点的CK *** 作。另一种是下游算子节点根据上游算子节点发送的barrier事件控制CK的触发时机。

CheckpointCoordinator触发算子Checkpoint *** 作

CheckpointCoordinator负责Source算子节点CK *** 作以及整个作业的CK管理,并且CheckpointCoordinator组件会接收TaskManager在CK执行完成之后返回的Ack信息。

CheckpointCoordinator用过监听器启动,当JobStatus变为RUNNING状态时启动CheckpointCoordinator。

public class CheckpointCoordinatorDeActivator implements JobStatusListener {
	@Override
	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
		if (newJobStatus == JobStatus.RUNNING) {
			// 启动CK调度程序
			coordinator.startCheckpointScheduler();
		} else {
			// 停止CK调度
			coordinator.stopCheckpointScheduler();
		}
	}
}

CheckpointCoordinator通过定时器周期性的触发ScheduledTrigger线程

public class CheckpointCoordinator {
	private final ScheduledExecutor timer;
	//通过timer定时器周期性触发ScheduledTrigger线程
	private ScheduledFuture scheduleTriggerWithDelay(long initDelay) {
		return timer.scheduleAtFixedRate(
			new ScheduledTrigger(),
			initDelay, baseInterval, TimeUnit.MILLISECONDS);
	}

	private final class ScheduledTrigger implements Runnable {
		@Override
		public void run() {
			try {
				//调用triggerCheckpoint方法触发CK *** 作
				triggerCheckpoint(System.currentTimeMillis(), true);
			}
			catch (Exception e) {
				LOG.error("Exception while triggering checkpoint for job {}.", job, e);
			}
		}
	}
}

CheckpointCoordinator.triggerCheckpoint()方法的逻辑比较多,主要包括以下步骤:

    CK *** 作前的检查 *** 作:
    检查CK的执行环节和参数、构建CK *** 作对应Task节点实例的Execution集合、构建需要发送Ack消息的ExecutionVertex集合。创建PendingCheckpoint
    从开始执行CK *** 作直到所有Task实例返回Ack确认成功消息,CK会一直处于Pending状态,确保Ck能被成功执行。PendingCheckpoint存储了ID、ackTasks、快照存储位置等信息
		//定义CK过程中状态快照数据存放位置
		final CheckpointStorageLocation checkpointStorageLocation;
		final long checkpointID;

		try {
			// CK的唯一标记,HA集群会通过Zookeeper实现checkpointID计数
			checkpointID = checkpointIdCounter.getAndIncrement();

			checkpointStorageLocation = props.isSavepoint() ?
					checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
					checkpointStorage.initializeLocationForCheckpoint(checkpointID);
		}
		catch (Throwable t) {
			int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
			LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
					job,
					numUnsuccessful,
					t);
			throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
		}

		final PendingCheckpoint checkpoint = new PendingCheckpoint(
			job,
			checkpointID,
			timestamp,
			ackTasks,
			masterHooks.keySet(),
			props,
			checkpointStorageLocation,
			executor);
    CK *** 作的触发和完成
    会遍历所有Source算子的Execution节点,触发节点所在TaskExecutor的CK *** 作。
		Execution[] executions = new Execution[tasksToTrigger.length];
		
		//。。。
		//触发所有Source算子的CK *** 作
		for (Execution execution: executions) {
			if (props.isSynchronous()) {
				execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
			} else {
				execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
			}
		}
		//返回CK中的CompletionFuture对象
		numUnsuccessfulCheckpointsTriggers.set(0);
		return checkpoint.getCompletionFuture();

之后的会通过Execution的LogicalSlot拿到对于的TaskManagerGateway,然后通过TaskManagerGateway调用TaskExecutor.triggerCheckpoint()。
再从TaskExecutor的taskSlotTable中拿到对于的Task线程,最后调用StreamTask.triggerCheckpointAsync()方法执行CK *** 作。

通过Execution的Slot资源获取到TaskManger对应的网关,通过RPC调用触发对应Task的CK *** 作
public class Execution implements AccessExecution, Archiveable, LogicalSlot.Payload {
	private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

		final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
		if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
			throw new IllegalArgumentException("only synchronous savepoints are allowed to advance the watermark to MAX.");
		}
		//获取当前Execution分配的LogicalSlot资源
		final LogicalSlot slot = assignedResource;

		if (slot != null) {
			//通过LogicalSlot获取到TaskManager对应的网关
			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
			//RPC调用TaskManager触发对应Task的CK *** 作
			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
		} else {
			LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
		}
	}
}
StreamTask触发CK *** 作:
public abstract class StreamTask>
		extends AbstractInvokable
		implements AsyncExceptionHandler {
	@Override
	public Future triggerCheckpointAsync(
			CheckpointmetaData checkpointmetaData,
			CheckpointOptions checkpointOptions,
			boolean advanceToEndOfEventTime) {
		//将CK *** 作封装为Mail,发送到StreamTask的MailBox中进行调度
		return mailboxProcessor.getMainMailboxExecutor().submit(
				() -> triggerCheckpoint(checkpointmetaData, checkpointOptions, advanceToEndOfEventTime),
				"checkpoint %s with %s",
			checkpointmetaData,
			checkpointOptions);
	}
对齐Barrier触发CK *** 作

StreamTask的Barrier对齐是通过CheckpointInputGate(封装的InputGate,具有barrier对齐功能)读取网络数据时触发的,这里过具体流程我们在上一节StreamTask数据流中已经介绍过了。
当所有channel的barrier对齐之后就会触发StreamTask.performCheckpoint()方法,生成当前Task的快照。

	private boolean performCheckpoint(
			CheckpointmetaData checkpointmetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics,
			boolean advanceToEndOfTime) throws Exception {

		LOG.debug("Starting checkpoint ({}) {} on task {}",
			checkpointmetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

		final long checkpointId = checkpointmetaData.getCheckpointId();

		if (isRunning) {
			actionExecutor.runThrowing(() -> {

				if (checkpointOptions.getCheckpointType().isSynchronous()) {
					setSynchronousSavepointId(checkpointId);

					if (advanceToEndOfTime) {
						advanceToEndOfEventTime();
					}
				}

				// 一下所有 *** 作应该是原子性的
				// Step (1): 执行一些预屏障工作,一般是不执行或执行一些轻量级的工作
				operatorChain.prepareSnapshotPreBarrier(checkpointId);

				// Step (2): 将barrier向下游广播出去
				operatorChain.broadcastCheckpointBarrier(
						checkpointId,
						checkpointmetaData.getTimestamp(),
						checkpointOptions);

				// Step (3): 对所有算子进行快照 *** 作,该步骤是异步 *** 作,不影响数据流的正常处理
				checkpointState(checkpointmetaData, checkpointOptions, checkpointMetrics);

			});

			return true;
		} else {
			actionExecutor.runThrowing(() -> {
				// Task没有处于RUNNING状态,向下游广播CancelCheckpointMarker事件,取消此次CK
				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointmetaData.getCheckpointId());
				recordWriter.broadcastEvent(message);
			});

			return false;
		}
	}

接下来我们讲解StreamTask执行快照 *** 作的具体过程。

    CheckpointingOperation 执行CK *** 作
public abstract class StreamTask>
		extends AbstractInvokable
		implements AsyncExceptionHandler {
	private void checkpointState(
			CheckpointmetaData checkpointmetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics) throws Exception {
		//创建CheckpointStreamFactory实例,用于具体的状态存储,
		//有Memory和FS两种实现,分别支持内存和文件文件类型系统的数据流输出
		CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
				checkpointmetaData.getCheckpointId(),
				checkpointOptions.getTargetLocation());
		// CheckpointingOperation 封装了CK执行的具体 *** 作
		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
			this,
			checkpointmetaData,
			checkpointOptions,
			storage,
			checkpointMetrics);
		//执行CK *** 作
		checkpointingOperation.executeCheckpointing();
	}
}
private static final class CheckpointingOperation {
	public void executeCheckpointing() throws Exception {
		//对StreamTask的所有算子创建执行快照 *** 作的OperatorSnapshotFutures对象,
		//并将所有算子的快照 *** 作存储在operatorSnapshotsInProgress集合中
		for (StreamOperator op : allOperators) {
			checkpointStreamOperator(op);
		}
		
		//。。。

		//AsyncCheckpointRunnable线程执行具体快照 *** 作
		AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
			owner,
			operatorSnapshotsInProgress,
			checkpointmetaData,
			checkpointMetrics,
			startAsyncPartNano);
		//通过StreamTask的asyncOperationsThreadPool线程池,异步执行operatorSnapshotsInProgress集合中所有算子的快照 *** 作
		owner.cancelables.registerCloseable(asyncCheckpointRunnable);
		owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
	}

	private void checkpointStreamOperator(StreamOperator op) throws Exception {
			if (null != op) {
				//将当前算子的快照 *** 作封装到OperatorSnapshotFutures中
				OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
						checkpointmetaData.getCheckpointId(),
						checkpointmetaData.getTimestamp(),
						checkpointOptions,
						storageLocation);
				operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
			}
		}
}
    将算子中的状态快照 *** 作封装到OperatorSnapshotFutures 中
    从此处我们可以看出,原生状态和管理状态的状态生成过程不同。
    (1)原生状态主要通过从snapshotContext中获取原生状态的快照 *** 作;
    (2)管理状态主要通过operatorStateBackend&keyedStateBackend进行状态管理,并根据StateBackend的不同实现将状态写入内存或外部文件系统中。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
			CheckpointStreamFactory factory) throws Exception {
		//1.如果有keyedStateBackend ,获取对于的KeyGroupRange 
		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
		//2.OperatorSnapshotFutures 对象,封装当前算子的状态快照 *** 作
		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
		//3.存储快照过程需要的上下文信息
		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
			checkpointId,
			timestamp,
			factory,
			keyGroupRange,
			getContainingTask().getCancelables());

		try {
			//执行快照 *** 作
			snapshotState(snapshotContext);
			//设置KeyedStateRawFuture&OperatorStateRawFuture,用于处理原生数据快照
			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
			snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
			//将operatorStateBackend&keyedStateBackend的状态快照方法注册到snapshotInProgress中,等待执行
			if (null != operatorStateBackend) {
				//设置OperatorState快照的异步future
				snapshotInProgress.setOperatorStateManagedFuture(
					operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}

			if (null != keyedStateBackend) {
				//设置KeyedState快照的异步future
				snapshotInProgress.setKeyedStateManagedFuture(
					keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}
		} catch (Exception snapshotException) {
			//。。。
		}
		//snapshotInProgress中封装了当前算子需要执行的所有快照 *** 作
		return snapshotInProgress;
	}
    AsyncCheckpointRunnable 线程的定义和执行
    所有的状态快照 *** 作都会被封装到OperatorSnapshotFutures对象中,最终通过AsyncCheckpointRunnable 线程触发执行。
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
	@Override
	public void run() {
		//1.为当前线程初始化文件系统安全网,确保数据正确写入
		FileSystemSafetyNet.initializeSafetyNetForThread();
		try {
			//发送给JM的CK数据
			TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
				new TaskStateSnapshot(operatorSnapshotsInProgress.size());
			//TaskExecutor本地的状态数据
			TaskStateSnapshot localTaskOperatorSubtaskStates =
				new TaskStateSnapshot(operatorSnapshotsInProgress.size());
			//遍历获取StreamTask中所有算子的OperatorSnapshotFutures对象
			for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) {

				OperatorID operatorID = entry.getKey();
				OperatorSnapshotFutures snapshotInProgress = entry.getValue();

				// 用于执行所有状态快照线程 *** 作,会执行KeyedState&OperatorState的快照 *** 作
				OperatorSnapshotFinalizer finalizedSnapshots =
					new OperatorSnapshotFinalizer(snapshotInProgress);
				
				jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
					operatorID,
					finalizedSnapshots.getJobManagerOwnedState());

				localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
					operatorID,
					finalizedSnapshots.getTaskLocalState());
			}

			//。。。
			
			if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
				CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
				//异步快照完成,向JM汇报CK的执行结果,并将状态发送给JM
				reportCompletedSnapshotStates(
					jobManagerTaskOperatorSubtaskStates,
					localTaskOperatorSubtaskStates,
					asyncDurationMillis);

			} else {
				LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
					owner.getName(),
					checkpointmetaData.getCheckpointId());
			}
		} catch (Exception e) {
			//。。。
		}
	}
}

算子中托管状态主要借助KeyedStateBackend&OperatorStateBackend管理,两个状态后端都实现了SnapshotStrategy接口,提供了状态快照方法。
SnapshotStrategy根据不同的状态后端,主要分为HeapSnapshotStrategy和RocksDBSnapshotStrategy,其中RocksDBSnapshotStrategy又分为增量和全量两种子类实现。

    发送AcknowledgeCheckpoint消息到CheckpointCoordinator中
    在StreamTask中所有算子都完成状态数据的快照之后,Task实例会将TaskStateSnapshot消息发送给JM的CheckpointCoordinator,并在CheckpointCoordinator中完成后续 *** 作,例如确认接受到所有Task实例的Ack消息以及将当前的PendingCheckpoint转换为CompleteCheckpoint,并将CK元数据写到外部持久化文件系统中等 *** 作。


Checkpoint的确认过程主要如下:
(1)StreamTask中所有算子快照完成后,调用StreamTask.reportCompletedSnapshotStates方法将快照等信息发送给TaskStateManager;
(2)TaskStateManager通过CheckpointCoordinatorGateway将CK的Ack信息发送给CheckpointCoordinator;
(3)JobMaster收到Ack消息之后,调用SchedulerNG.acknowledgeCheckpoint方法将Ack消息封装为AcknowledgeCheckpoint对象,传递给CheckpointCoordinator;
(4)CheckpointCoordinator取出对于的PendingCheckpoint,判断是否所有Task实例都Ack消息都收到了,如果所有Task的Ack都已收到,则调用completePendingCheckpoint方法完成当前PendingCheckpoint *** 作;
(5)将PendingCheckpoint转化为CompleteCheckpoint,此时会将该CK的元数据和算子状态数据序列化到外部文件系统或内存中,并将CompleteCheckpoint添加到集合中;
(6)CheckpointCoordinator遍历所有Task对于的Execution节点,RPC调用ask实例的notifyCheckpointComplete方法。

CheckpointCoordinator持久化CK元数据和算子状态

CheckpointCoordinator受到所有Task实例的ACK响应后,会调用PendingCheckpoint.finalizeCheckpoint将PendingCheckpoint转化为CompleteCheckpoint,并将CK的状态数据写到外部文件系统中。

public CompletedCheckpoint finalizeCheckpoint() throws IOException {

		synchronized (lock) {
		
			try {
				// write out the metadata
				final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
				final CompletedCheckpointStorageLocation finalizedLocation;

				try (CheckpointmetadataOutputStream out = targetLocation.createmetadataOutputStream()) {
					Checkpoints.storeCheckpointmetadata(savepoint, out);
					finalizedLocation = out.closeAndFinalizeCheckpoint();
				}

				CompletedCheckpoint completed = new CompletedCheckpoint(
						jobId,
						checkpointId,
						checkpointTimestamp,
						System.currentTimeMillis(),
						operatorStates,
						masterStates,
						props,
						finalizedLocation);


				return completed;
			}
			catch (Throwable t) {
				onCompletionPromise.completeExceptionally(t);
				ExceptionUtils.rethrowIOException(t);
				return null; // silence the compiler
			}
		}
	}
最终会调用SavepointV2Serializer 将状态序列化后,写到外部文件系统或内存中
public class SavepointV2Serializer implements SavepointSerializer {
	@Override
	public void serialize(SavepointV2 checkpointmetadata, DataOutputStream dos) throws IOException {
		// first: checkpoint ID
		dos.writeLong(checkpointmetadata.getCheckpointId());

		// second: master state
		final Collection masterStates = checkpointmetadata.getMasterStates();
		dos.writeInt(masterStates.size());
		for (MasterState ms : masterStates) {
			serializeMasterState(ms, dos);
		}

		// third: operator states
		Collection operatorStates = checkpointmetadata.getOperatorStates();
		dos.writeInt(operatorStates.size());

		for (OperatorState operatorState : operatorStates) {
			// Operator ID
			dos.writeLong(operatorState.getOperatorID().getLowerPart());
			dos.writeLong(operatorState.getOperatorID().getUpperPart());

			// Parallelism
			int parallelism = operatorState.getParallelism();
			dos.writeInt(parallelism);
			dos.writeInt(operatorState.getMaxParallelism());
			dos.writeInt(1);

			// Sub task states
			Map subtaskStateMap = operatorState.getSubtaskStates();
			dos.writeInt(subtaskStateMap.size());
			for (Map.Entry entry : subtaskStateMap.entrySet()) {
				dos.writeInt(entry.getKey());
				serializeSubtaskState(entry.getValue(), dos);
			}
		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706695.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存