Iceberg源码学习:flink写iceberg流程

Iceberg源码学习:flink写iceberg流程,第1张

Iceberg源码学习:flink写iceberg流程

目录
  • 开始实例
  • IcebergStreamWriter
  • IcebergFilesCommitter
  • 附:flink task执行流程
  • 参考

开始实例

flink支持DataStream和DataStream写入iceberg

StreamExecutionEnvironment env = ...;

DataStream input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input, Flink_SCHEMA)
    .tableLoader(tableLoader)
    .writeParallelism(1)
    .build();
env.execute("Test Iceberg DataStream");

input为DataStream和DataStream形式的输入流,Flink_SCHEMA为TableSchema;

首先看build()方法

public DataStreamSink build() {
            Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream.");
            Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
            if (this.table == null) {
                this.tableLoader.open();

                try {
                    TableLoader loader = this.tableLoader;
                    Throwable var2 = null;

                    try {
                        this.table = loader.loadTable();
                    } catch (Throwable var12) {
                        var2 = var12;
                        throw var12;
                    } finally {
                        if (loader != null) {
                            if (var2 != null) {
                                try {
                                    loader.close();
                                } catch (Throwable var11) {
                                    var2.addSuppressed(var11);
                                }
                            } else {
                                loader.close();
                            }
                        }

                    }
                } catch (IOException var14) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14);
                }
            }

            List equalityFieldIds = Lists.newArrayList();
            if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
                Iterator var16 = this.equalityFieldColumns.iterator();

                while(var16.hasNext()) {
                    String column = (String)var16.next();
                    NestedField field = this.table.schema().findField(column);
                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema());
                    equalityFieldIds.add(field.fieldId());
                }
            }

            RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
            this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType);
            IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
            this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism;
            DataStream returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1);
            return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1);
        }

此处创建写的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter

IcebergStreamWriter
IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);

build()方法中,调用createStreamWriter()创建IcebergStreamWriter

static IcebergStreamWriter createStreamWriter(Table table, RowType flinkRowType, List equalityFieldIds) {
        Map props = table.properties();
        long targetFileSize = getTargetFileSizeBytes(props);
        FileFormat fileFormat = getFileFormat(props);
        TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds);
        return new IcebergStreamWriter(table.name(), taskWriterFactory);
    }

根据表信息构建TaskWriterFactory,并传入到IcebergStreamWriter

class IcebergStreamWriter extends AbstractStreamOperator implements OneInputStreamOperator, BoundedoneInput {
    private static final long serialVersionUID = 1L;
    private final String fullTableName;
    private final TaskWriterFactory taskWriterFactory;
    private transient TaskWriter writer;
    private transient int subTaskId;
    private transient int attemptId;

    IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) {
        this.fullTableName = fullTableName;
        this.taskWriterFactory = taskWriterFactory;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() {
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.attemptId = this.getRuntimeContext().getAttemptNumber();
        this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
        this.writer = this.taskWriterFactory.create();
    }
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.emit(this.writer.complete());
        this.writer = this.taskWriterFactory.create();
    }

    public void processElement(StreamRecord element) throws Exception {
        this.writer.write(element.getValue());
    }
}

在open中通过传入的taskWriterFactory构建TaskWriter

public TaskWriter create() {
        Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");
        if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));
        } else {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));
        }
    }

此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
四个类的调用关系:

指定字段:

UnpartitionedDeltaWriter -> baseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add()
PartitionedDeltaWriter -> baseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()

未指定字段:

UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()
RowDataPartitionedFanoutWriter -> baseRollingWriter.write -> RollingFileWriter.write() -> appender.add()

底层调用的appender为创建TaskWriter传入的FlinkAppenderFactory创建的

在processElement()中调用write(element.getValue())方法,将数据写入,最后在checkpoint时提交。

提示:task执行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke()
beforeInvoke调用open()和initializeState(),runMailboxLoop调用processElement()处理数据

IcebergFilesCommitter
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);

build()方法中,传入tableLoader和overwrite直接创建IcebergFilesCommitter。
checkpoint初始化 *** 作在IcebergFilesCommitter的initializeState()

public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        int attemptId = this.getRuntimeContext().getAttemptNumber();
        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId);
        this.maxCommittedCheckpointId = -1L;
        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DEscriptOR);
        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DEscriptOR);
        if (context.isRestored()) {
            String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
            NavigableMap uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
            if (!uncommittedDataFiles.isEmpty()) {
                long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
                this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
            }
        }

    }

checkpoint提交流程在IcebergFilesCommitter的snapshotState中

public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        long checkpointId = context.getCheckpointId();
        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId);
        this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
        this.checkpointsState.clear();
        this.checkpointsState.add(this.dataFilesPerCheckpoint);
        this.jobIdState.clear();
        this.jobIdState.add(this.flinkJobId);
        this.writeResultsOfCurrentCkpt.clear();
    }

this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
为更新当前的checkpointId和manifest元文件信息
dataFilesPerCheckpoint与调用关系如下:

private byte[] writeToManifest(long checkpointId) throws IOException {
        if (this.writeResultsOfCurrentCkpt.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        } else {
            WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
            DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> {
                return this.manifestOutputFileFactory.create(checkpointId);
            }, this.table.spec());
            return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
        }
    }

writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced数据文件。然后,根据result创建deltaManifests ,并且返回序列化后的manifest信息。

deltaManifests 值如下:

static DeltaManifests writeCompletedFiles(WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException {
        ManifestFile dataManifest = null;
        ManifestFile deleteManifest = null;
        if (result.dataFiles() != null && result.dataFiles().length > 0) {
            dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
        }

        if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
            OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get();
            ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
            ManifestWriter writer = deleteManifestWriter;
            Throwable var8 = null;

            try {
                DeleteFile[] var9 = result.deleteFiles();
                int var10 = var9.length;

                for(int var11 = 0; var11 < var10; ++var11) {
                    DeleteFile deleteFile = var9[var11];
                    writer.add(deleteFile);
                }
            } catch (Throwable var16) {
                var8 = var16;
                throw var16;
            } finally {
                if (writer != null) {
                    $closeResource(var8, writer);
                }

            }

            deleteManifest = deleteManifestWriter.toManifestFile();
        }

        return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
    }

从上面写入过程可以看出,datafile和deletefile写入后,分别生成各自的Manifest文件,最后创建DeltaManifests返回。
最后通知checkpoint完成,提交checkpoint

public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (checkpointId > this.maxCommittedCheckpointId) {
            this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
            this.maxCommittedCheckpointId = checkpointId;
        }

    }
private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException {
        NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true);
        List manifests = Lists.newArrayList();
        NavigableMap pendingResults = Maps.newTreeMap();
        Iterator var8 = pendingMap.entrySet().iterator();

        while(var8.hasNext()) {
            Entry e = (Entry)var8.next();
            if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) {
                DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue());
                pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
                manifests.addAll(deltaManifests.manifests());
            }
        }

        if (this.replacePartitions) {
            this.replacePartitions(pendingResults, newFlinkJobId, checkpointId);
        } else {
            this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
        }

        pendingMap.clear();
        var8 = manifests.iterator();

        while(var8.hasNext()) {
            ManifestFile manifest = (ManifestFile)var8.next();

            try {
                this.table.io().deleteFile(manifest.path());
            } catch (Exception var12) {
                String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString();
                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12);
            }
        }

    }

这里会反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:

然后根据replacePartitions(创建时传入的overwrite值,默认为false)值提交事务,默认情况下调用commitDeltaTxn()

    private void commitDeltaTxn(NavigableMap pendingResults, String newFlinkJobId, long checkpointId) {
        int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> {
            return r.deleteFiles().length;
        }).sum();
        Stream var10000;
        if (deleteFilesNum == 0) {
            AppendFiles appendFiles = this.table.newAppend();
            int numFiles = 0;
            Iterator var8 = pendingResults.values().iterator();

            while(var8.hasNext()) {
                WriteResult result = (WriteResult)var8.next();
                Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
                numFiles += result.dataFiles().length;
                var10000 = Arrays.stream(result.dataFiles());
                Objects.requireNonNull(appendFiles);
                var10000.forEach(appendFiles::appendFile);
            }

            this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
        } else {
            Iterator var12 = pendingResults.entrySet().iterator();

            while(var12.hasNext()) {
                Entry e = (Entry)var12.next();
                WriteResult result = (WriteResult)e.getValue();
                RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles();
                int numDataFiles = result.dataFiles().length;
                var10000 = Arrays.stream(result.dataFiles());
                Objects.requireNonNull(rowDelta);
                var10000.forEach(rowDelta::addRows);
                int numDeleteFiles = result.deleteFiles().length;
                var10000 = Arrays.stream(result.deleteFiles());
                Objects.requireNonNull(rowDelta);
                var10000.forEach(rowDelta::addDeletes);
                this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey());
            }
        }

    }

创建一个RowDelta的对象rowDelta或MergeAppend的appendFiles,rowDelta的实现类为baseRowDelta继承自MergingSnapshotProducer作为一个新的snapshot提交;MergeAppend的实现类MergeAppend,同样继承MergingSnapshotProducer。

    private void commitOperation(SnapshotUpdate operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
        LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
        operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
        operation.set("flink.job-id", newFlinkJobId);
        long start = System.currentTimeMillis();
        operation.commit();
        long duration = System.currentTimeMillis() - start;
        LOG.info("Committed in {} ms", duration);
    }

operation.commit()会调用SnapshotProducer中的commit()方法

    public void commit() {
        AtomicLong newSnapshotId = new AtomicLong(-1L);

        try {
            Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> {
                Snapshot newSnapshot = this.apply();
                newSnapshotId.set(newSnapshot.snapshotId());
                Tablemetadata updated;
                if (this.stageOnly) {
                    updated = this.base.addStagedSnapshot(newSnapshot);
                } else {
                    updated = this.base.replaceCurrentSnapshot(newSnapshot);
                }

                if (updated != this.base) {
                    taskOps.commit(this.base, updated.withUUID());
                }
            });
        } catch (RuntimeException var5) {
            Exceptions.suppressAndThrow(var5, this::cleanAll);
        }

        LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName());

        try {
            Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get());
            if (saved != null) {
                this.cleanUncommitted(Sets.newHashSet(saved.allManifests()));
                Iterator var3 = this.manifestLists.iterator();

                while(var3.hasNext()) {
                    String manifestList = (String)var3.next();
                    if (!saved.manifestListLocation().equals(manifestList)) {
                        this.deleteFile(manifestList);
                    }
                }
            } else {
                LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
            }
        } catch (RuntimeException var6) {
            LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6);
        }

        this.notifyListeners();
    }

SnapshotProducer.apply() 方法执行写入manifestFiles数据,返回快照数据;

    public Snapshot apply() {
        this.base = this.refresh();
        Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null;
        long sequenceNumber = this.base.nextSequenceNumber();
        this.validate(this.base);
        List manifests = this.apply(this.base);
        if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled", true)) {
            return new baseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests);
        } else {
            OutputFile manifestList = this.manifestListPath();

            try {
                ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber);
                Throwable var7 = null;

                try {
                    this.manifestLists.add(manifestList.location());
                    ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
                    Tasks.range(manifestFiles.length).stoponFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> {
                        manifestFiles[index] = (ManifestFile)this.manifestsWithmetadata.get((ManifestFile)manifests.get(index));
                    });
                    writer.addAll(Arrays.asList(manifestFiles));
                } catch (Throwable var13) {
                    var7 = var13;
                    throw var13;
                } finally {
                    if (writer != null) {
                        $closeResource(var7, writer);
                    }

                }
            } catch (IOException var15) {
                throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]);
            }

            return new baseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location());
        }
    }

然后生成表的元数据updated

public Tablemetadata replaceCurrentSnapshot(Snapshot snapshot) {
        if (this.snapshotsById.containsKey(snapshot.snapshotId())) {
            return this.setCurrentSnapshotTo(snapshot);
        } else {
            ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber});
            List newSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build();
            List newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new Tablemetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build();
            return new Tablemetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis));
        }
    }

调用basemetastoreTableOperations算子的commit()方法

public void commit(Tablemetadata base, Tablemetadata metadata) {
        if (base != this.current()) {
            throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]);
        } else if (base == metadata) {
            LOG.info("Nothing to commit.");
        } else {
            long start = System.currentTimeMillis();
            this.doCommit(base, metadata);
            this.deleteRemovedmetadataFiles(base, metadata);
            this.requestRefresh();
            LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start);
        }
    }

最后调用HiveTableOperations的doCommit(),执行提交 *** 作。

protected void doCommit(Tablemetadata base, Tablemetadata metadata) {
        String newmetadataLocation = this.writeNewmetadata(metadata, this.currentVersion() + 1);
        boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf);
        boolean threw = true;
        boolean updateHiveTable = false;
        Optional lockId = Optional.empty();

        try {
            lockId = Optional.of(this.acquireLock());
            Table tbl = this.loadHmsTable();
            if (tbl != null) {
                if (base == null && tbl.getParameters().get("metadata_location") != null) {
                    throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
                }

                updateHiveTable = true;
                LOG.debug("Committing existing table: {}", this.fullName);
            } else {
                tbl = this.newHmsTable();
                LOG.debug("Committing new table: {}", this.fullName);
            }

            tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled));
            String metadataLocation = (String)tbl.getParameters().get("metadata_location");
            String basemetadataLocation = base != null ? base.metadataFileLocation() : null;
            if (!Objects.equals(basemetadataLocation, metadataLocation)) {
                throw new CommitFailedException("base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{basemetadataLocation, metadataLocation, this.database, this.tableName});
            }

            this.setParameters(newmetadataLocation, tbl, hiveEngineEnabled);
            this.persistTable(tbl, updateHiveTable);
            threw = false;
        } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) {
            throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
        } catch (UnknownHostException | TException var17) {
            if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
                throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17);
            }

            throw new RuntimeException(String.format("metastore operation failed for %s.%s", this.database, this.tableName), var17);
        } catch (InterruptedException var18) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted during commit", var18);
        } finally {
            this.cleanupmetadataAndUnlock(threw, newmetadataLocation, lockId);
        }
附:flink task执行流程

task的生命周期:
StreamTask是所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
执行过程:

@Override
	public final void invoke() throws Exception {
		try {
			beforeInvoke();

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			runMailboxLoop();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			afterInvoke();
		}
		catch (Exception invokeException) {
			try {
				cleanUpInvoke();
			}
			catch (Throwable cleanUpException) {
				throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
			}
			throw invokeException;
		}
		cleanUpInvoke();
	}

在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
调用关系:

-- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> initialize-operator-states()
*        +----> open-operators()
*        +----> run()
* --------------> mailboxProcessor.runMailboxLoop();
* --------------> StreamTask.processInput()
* --------------> StreamTask.inputProcessor.processInput()
* --------------> 间接调用 operator的processElement()和processWatermark()方法
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()
  1. 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
  2. 加载 OperatorChain 中的所有算子
  3. 所有的 operator 调用 setup
  4. task 相关的初始化 *** 作
  5. 所有 operator 调用 initializeState 初始化状态
  6. 所有的 operator 调用 open
  7. run 方法循环处理数据
  8. 所有 operator 调用 close
  9. 所有 operator 调用 dispose
  10. 通用的 cleanup *** 作
  11. task 相关的 cleanup *** 作
参考

iceberg相关学习文档

  1. https://iceberg.apache.org/#flink/
  2. https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5668940.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存