- 开始实例
- IcebergStreamWriter
- IcebergFilesCommitter
- 附:flink task执行流程
- 参考
flink支持DataStream和DataStream写入iceberg
StreamExecutionEnvironment env = ...; DataStreaminput = ... ; Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf); FlinkSink.forRowData(input, Flink_SCHEMA) .tableLoader(tableLoader) .writeParallelism(1) .build(); env.execute("Test Iceberg DataStream");
input为DataStream和DataStream形式的输入流,Flink_SCHEMA为TableSchema;
首先看build()方法:
public DataStreamSinkbuild() { Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream."); Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null"); if (this.table == null) { this.tableLoader.open(); try { TableLoader loader = this.tableLoader; Throwable var2 = null; try { this.table = loader.loadTable(); } catch (Throwable var12) { var2 = var12; throw var12; } finally { if (loader != null) { if (var2 != null) { try { loader.close(); } catch (Throwable var11) { var2.addSuppressed(var11); } } else { loader.close(); } } } } catch (IOException var14) { throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14); } } List equalityFieldIds = Lists.newArrayList(); if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) { Iterator var16 = this.equalityFieldColumns.iterator(); while(var16.hasNext()) { String column = (String)var16.next(); NestedField field = this.table.schema().findField(column); Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema()); equalityFieldIds.add(field.fieldId()); } } RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema); this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType); IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite); this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism; DataStream returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1); return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1); }
此处创建写的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter
IcebergStreamWriterIcebergStreamWriterstreamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
build()方法中,调用createStreamWriter()创建IcebergStreamWriter
static IcebergStreamWritercreateStreamWriter(Table table, RowType flinkRowType, List equalityFieldIds) { Map props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); FileFormat fileFormat = getFileFormat(props); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds); return new IcebergStreamWriter(table.name(), taskWriterFactory); }
根据表信息构建TaskWriterFactory,并传入到IcebergStreamWriter
class IcebergStreamWriterextends AbstractStreamOperator implements OneInputStreamOperator , BoundedoneInput { private static final long serialVersionUID = 1L; private final String fullTableName; private final TaskWriterFactory taskWriterFactory; private transient TaskWriter writer; private transient int subTaskId; private transient int attemptId; IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; this.setChainingStrategy(ChainingStrategy.ALWAYS); } public void open() { this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask(); this.attemptId = this.getRuntimeContext().getAttemptNumber(); this.taskWriterFactory.initialize(this.subTaskId, this.attemptId); this.writer = this.taskWriterFactory.create(); } public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { this.emit(this.writer.complete()); this.writer = this.taskWriterFactory.create(); } public void processElement(StreamRecord element) throws Exception { this.writer.write(element.getValue()); } }
在open中通过传入的taskWriterFactory构建TaskWriter
public TaskWritercreate() { Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds)); } else { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema)); } }
此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
四个类的调用关系:
指定字段:
UnpartitionedDeltaWriter -> baseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add()
PartitionedDeltaWriter -> baseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()
未指定字段:
UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()
RowDataPartitionedFanoutWriter -> baseRollingWriter.write -> RollingFileWriter.write() -> appender.add()
底层调用的appender为创建TaskWriter传入的FlinkAppenderFactory创建的
在processElement()中调用write(element.getValue())方法,将数据写入,最后在checkpoint时提交。
提示:task执行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke()
beforeInvoke调用open()和initializeState(),runMailboxLoop调用processElement()处理数据
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
build()方法中,传入tableLoader和overwrite直接创建IcebergFilesCommitter。
checkpoint初始化 *** 作在IcebergFilesCommitter的initializeState()
public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString(); this.tableLoader.open(); this.table = this.tableLoader.loadTable(); int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask(); int attemptId = this.getRuntimeContext().getAttemptNumber(); this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId); this.maxCommittedCheckpointId = -1L; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DEscriptOR); this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DEscriptOR); if (context.isRestored()) { String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next(); Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty"); this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId); NavigableMapuncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false); if (!uncommittedDataFiles.isEmpty()) { long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey(); this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId); } } }
checkpoint提交流程在IcebergFilesCommitter的snapshotState中
public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); long checkpointId = context.getCheckpointId(); LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId); this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId)); this.checkpointsState.clear(); this.checkpointsState.add(this.dataFilesPerCheckpoint); this.jobIdState.clear(); this.jobIdState.add(this.flinkJobId); this.writeResultsOfCurrentCkpt.clear(); }
this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
为更新当前的checkpointId和manifest元文件信息
dataFilesPerCheckpoint与调用关系如下:
private byte[] writeToManifest(long checkpointId) throws IOException { if (this.writeResultsOfCurrentCkpt.isEmpty()) { return EMPTY_MANIFEST_DATA; } else { WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> { return this.manifestOutputFileFactory.create(checkpointId); }, this.table.spec()); return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests); } }
writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced数据文件。然后,根据result创建deltaManifests ,并且返回序列化后的manifest信息。
deltaManifests 值如下:
static DeltaManifests writeCompletedFiles(WriteResult result, SupplieroutputFileSupplier, PartitionSpec spec) throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); } if (result.deleteFiles() != null && result.deleteFiles().length > 0) { OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get(); ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); ManifestWriter writer = deleteManifestWriter; Throwable var8 = null; try { DeleteFile[] var9 = result.deleteFiles(); int var10 = var9.length; for(int var11 = 0; var11 < var10; ++var11) { DeleteFile deleteFile = var9[var11]; writer.add(deleteFile); } } catch (Throwable var16) { var8 = var16; throw var16; } finally { if (writer != null) { $closeResource(var8, writer); } } deleteManifest = deleteManifestWriter.toManifestFile(); } return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); }
从上面写入过程可以看出,datafile和deletefile写入后,分别生成各自的Manifest文件,最后创建DeltaManifests返回。
最后通知checkpoint完成,提交checkpoint
public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); if (checkpointId > this.maxCommittedCheckpointId) { this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId); this.maxCommittedCheckpointId = checkpointId; } }
private void commitUpToCheckpoint(NavigableMapdeltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException { NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); List manifests = Lists.newArrayList(); NavigableMap pendingResults = Maps.newTreeMap(); Iterator var8 = pendingMap.entrySet().iterator(); while(var8.hasNext()) { Entry e = (Entry)var8.next(); if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) { DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue()); pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io())); manifests.addAll(deltaManifests.manifests()); } } if (this.replacePartitions) { this.replacePartitions(pendingResults, newFlinkJobId, checkpointId); } else { this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId); } pendingMap.clear(); var8 = manifests.iterator(); while(var8.hasNext()) { ManifestFile manifest = (ManifestFile)var8.next(); try { this.table.io().deleteFile(manifest.path()); } catch (Exception var12) { String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString(); LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12); } } }
这里会反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:
然后根据replacePartitions(创建时传入的overwrite值,默认为false)值提交事务,默认情况下调用commitDeltaTxn()
private void commitDeltaTxn(NavigableMappendingResults, String newFlinkJobId, long checkpointId) { int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> { return r.deleteFiles().length; }).sum(); Stream var10000; if (deleteFilesNum == 0) { AppendFiles appendFiles = this.table.newAppend(); int numFiles = 0; Iterator var8 = pendingResults.values().iterator(); while(var8.hasNext()) { WriteResult result = (WriteResult)var8.next(); Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); numFiles += result.dataFiles().length; var10000 = Arrays.stream(result.dataFiles()); Objects.requireNonNull(appendFiles); var10000.forEach(appendFiles::appendFile); } this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId); } else { Iterator var12 = pendingResults.entrySet().iterator(); while(var12.hasNext()) { Entry e = (Entry)var12.next(); WriteResult result = (WriteResult)e.getValue(); RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles(); int numDataFiles = result.dataFiles().length; var10000 = Arrays.stream(result.dataFiles()); Objects.requireNonNull(rowDelta); var10000.forEach(rowDelta::addRows); int numDeleteFiles = result.deleteFiles().length; var10000 = Arrays.stream(result.deleteFiles()); Objects.requireNonNull(rowDelta); var10000.forEach(rowDelta::addDeletes); this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey()); } } }
创建一个RowDelta的对象rowDelta或MergeAppend的appendFiles,rowDelta的实现类为baseRowDelta继承自MergingSnapshotProducer作为一个新的snapshot提交;MergeAppend的实现类MergeAppend,同样继承MergingSnapshotProducer。
private void commitOperation(SnapshotUpdate> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) { LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table}); operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId)); operation.set("flink.job-id", newFlinkJobId); long start = System.currentTimeMillis(); operation.commit(); long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); }
operation.commit()会调用SnapshotProducer中的commit()方法
public void commit() { AtomicLong newSnapshotId = new AtomicLong(-1L); try { Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> { Snapshot newSnapshot = this.apply(); newSnapshotId.set(newSnapshot.snapshotId()); Tablemetadata updated; if (this.stageOnly) { updated = this.base.addStagedSnapshot(newSnapshot); } else { updated = this.base.replaceCurrentSnapshot(newSnapshot); } if (updated != this.base) { taskOps.commit(this.base, updated.withUUID()); } }); } catch (RuntimeException var5) { Exceptions.suppressAndThrow(var5, this::cleanAll); } LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName()); try { Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get()); if (saved != null) { this.cleanUncommitted(Sets.newHashSet(saved.allManifests())); Iterator var3 = this.manifestLists.iterator(); while(var3.hasNext()) { String manifestList = (String)var3.next(); if (!saved.manifestListLocation().equals(manifestList)) { this.deleteFile(manifestList); } } } else { LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); } } catch (RuntimeException var6) { LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6); } this.notifyListeners(); }
SnapshotProducer.apply() 方法执行写入manifestFiles数据,返回快照数据;
public Snapshot apply() { this.base = this.refresh(); Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null; long sequenceNumber = this.base.nextSequenceNumber(); this.validate(this.base); Listmanifests = this.apply(this.base); if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled", true)) { return new baseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests); } else { OutputFile manifestList = this.manifestListPath(); try { ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber); Throwable var7 = null; try { this.manifestLists.add(manifestList.location()); ManifestFile[] manifestFiles = new ManifestFile[manifests.size()]; Tasks.range(manifestFiles.length).stoponFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> { manifestFiles[index] = (ManifestFile)this.manifestsWithmetadata.get((ManifestFile)manifests.get(index)); }); writer.addAll(Arrays.asList(manifestFiles)); } catch (Throwable var13) { var7 = var13; throw var13; } finally { if (writer != null) { $closeResource(var7, writer); } } } catch (IOException var15) { throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]); } return new baseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location()); } }
然后生成表的元数据updated
public Tablemetadata replaceCurrentSnapshot(Snapshot snapshot) { if (this.snapshotsById.containsKey(snapshot.snapshotId())) { return this.setCurrentSnapshotTo(snapshot); } else { ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber}); ListnewSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build(); List newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new Tablemetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build(); return new Tablemetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis)); } }
调用basemetastoreTableOperations算子的commit()方法
public void commit(Tablemetadata base, Tablemetadata metadata) { if (base != this.current()) { throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]); } else if (base == metadata) { LOG.info("Nothing to commit."); } else { long start = System.currentTimeMillis(); this.doCommit(base, metadata); this.deleteRemovedmetadataFiles(base, metadata); this.requestRefresh(); LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start); } }
最后调用HiveTableOperations的doCommit(),执行提交 *** 作。
protected void doCommit(Tablemetadata base, Tablemetadata metadata) { String newmetadataLocation = this.writeNewmetadata(metadata, this.currentVersion() + 1); boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf); boolean threw = true; boolean updateHiveTable = false; Optional lockId = Optional.empty(); try { lockId = Optional.of(this.acquireLock()); Table tbl = this.loadHmsTable(); if (tbl != null) { if (base == null && tbl.getParameters().get("metadata_location") != null) { throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName}); } updateHiveTable = true; LOG.debug("Committing existing table: {}", this.fullName); } else { tbl = this.newHmsTable(); LOG.debug("Committing new table: {}", this.fullName); } tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled)); String metadataLocation = (String)tbl.getParameters().get("metadata_location"); String basemetadataLocation = base != null ? base.metadataFileLocation() : null; if (!Objects.equals(basemetadataLocation, metadataLocation)) { throw new CommitFailedException("base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{basemetadataLocation, metadataLocation, this.database, this.tableName}); } this.setParameters(newmetadataLocation, tbl, hiveEngineEnabled); this.persistTable(tbl, updateHiveTable); threw = false; } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) { throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName}); } catch (UnknownHostException | TException var17) { if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17); } throw new RuntimeException(String.format("metastore operation failed for %s.%s", this.database, this.tableName), var17); } catch (InterruptedException var18) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted during commit", var18); } finally { this.cleanupmetadataAndUnlock(threw, newmetadataLocation, lockId); }附:flink task执行流程
task的生命周期:
StreamTask是所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
执行过程:
@Override public final void invoke() throws Exception { try { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); } catch (Exception invokeException) { try { cleanUpInvoke(); } catch (Throwable cleanUpException) { throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException); } throw invokeException; } cleanUpInvoke(); }
在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
调用关系:
-- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * --------------> mailboxProcessor.runMailboxLoop(); * --------------> StreamTask.processInput() * --------------> StreamTask.inputProcessor.processInput() * --------------> 间接调用 operator的processElement()和processWatermark()方法 * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup()
- 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
- 加载 OperatorChain 中的所有算子
- 所有的 operator 调用 setup
- task 相关的初始化 *** 作
- 所有 operator 调用 initializeState 初始化状态
- 所有的 operator 调用 open
- run 方法循环处理数据
- 所有 operator 调用 close
- 所有 operator 调用 dispose
- 通用的 cleanup *** 作
- task 相关的 cleanup *** 作
iceberg相关学习文档
- https://iceberg.apache.org/#flink/
- https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)