Iceberg源码学习:flink写iceberg四种TaskWriter区别

Iceberg源码学习:flink写iceberg四种TaskWriter区别,第1张

Iceberg源码学习:flink写iceberg四种TaskWriter区别

目录
  • 开始
  • 继承关系
    • 关系图
    • 调用流程
  • 底层调用
  • 实例与过程分析
    • 开始
    • 实例
    • 总结

开始

flink写iceberg时,IcebergStreamWriter的open()方法中,会调用TaskWriterFactory.create(),会创建四种类型的写(UnpartitionedDeltaWriter/UnpartitionedWriter/PartitionedDeltaWriter/RowDataPartitionedFanoutWriter),本文主要追踪这四种类型的写。
其中,IcebergStreamWriter.open()方法:

    public void open() {
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.attemptId = this.getRuntimeContext().getAttemptNumber();
        this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
        this.writer = this.taskWriterFactory.create();
    }

TaskWriterFactory.create()方法:

    public TaskWriter create() {
        Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");
        if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));
        } else {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));
        }
    }
继承关系

此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)

关系图


从图中可以看出,几种类型的写均继承自baseTaskWriter抽象类。区别在于 Partitioned方式的写需要处理一些分区 Key 生成的逻辑。
其中:

  1. TaskWriter/baseTaskWriter/UnpartitionedWriter/PartitionedWriter/RowDataPartitionedFanoutWriter均在org.apache.iceberg.io 这个包,这里面的类或接口都是在 iceberg-core 模块中,这里面定义了 Iceberg 写数据的公共逻辑;
  2. PartitionedDeltaWriter和UnpartitionedDeltaWriter是在org.apache.iceberg.flink.sink包中,flink connector模块实现的。

以上类均实现TaskWriter接口:

public interface TaskWriter extends Closeable {
    void write(T var1) throws IOException;

    void abort() throws IOException;

    default DataFile[] dataFiles() throws IOException {
        WriteResult result = this.complete();
        Preconditions.checkArgument(result.deleteFiles() == null || result.deleteFiles().length == 0, "Should have no delete files in this write result.");
        return result.dataFiles();
    }

    WriteResult complete() throws IOException;
}

baseTaskWriter继承自TaskWriter,内部类RollingFileWriter(以上几种最终调用的RollingFileWriter.write())继承自内部类baseRollingWriter,创建baseRollingWriter时,调用openCurrent(),会生成DataWriter。baseRollingWriter的write()方法会调用datawriter.add(),最后调用FileAppender的add()方法进行数据的写入工作。

    private abstract class baseRollingWriter implements Closeable {
        private static final int ROWS_DIVISOR = 1000;
        private final PartitionKey partitionKey;
        private EncryptedOutputFile currentFile;
        private W currentWriter;
        private long currentRows;

        private baseRollingWriter(PartitionKey partitionKey) {
            this.currentFile = null;
            this.currentWriter = null;
            this.currentRows = 0L;
            this.partitionKey = partitionKey;
            this.openCurrent();
        }

        abstract W newWriter(EncryptedOutputFile var1, PartitionKey var2);

        abstract long length(W var1);

        abstract void write(W var1, T var2);

        abstract void complete(W var1);

        public void write(T record) throws IOException {
            this.write(this.currentWriter, record);
            ++this.currentRows;
            if (this.shouldRollTonewFile()) {
                this.closeCurrent();
                this.openCurrent();
            }
        }

        private void openCurrent() {
            if (this.partitionKey == null) {
                this.currentFile = baseTaskWriter.this.fileFactory.newOutputFile();
            } else {
                this.currentFile = baseTaskWriter.this.fileFactory.newOutputFile(this.partitionKey);
            }

            this.currentWriter = this.newWriter(this.currentFile, this.partitionKey);
            this.currentRows = 0L;
        }
    }
调用流程

以下是以上类做写 *** 作的调用流程

  1. 指定字段:

UnpartitionedDeltaWriter 调用父类的write()方法: baseDeltaTaskWriter.write() -> baseEqualityDeltaWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

PartitionedDeltaWriter 调用父类的write()方法: baseDeltaTaskWriter.write() -> baseEqualityDeltaWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

两种方式均调用baseDeltaTaskWriter的write()方法,区别在于route()的实现不同:

abstract baseDeltaTaskWriter.RowDataDeltaWriter route(RowData var1);

// PartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {
    this.partitionKey.partition(this.wrapper().wrap(row));
    RowDataDeltaWriter writer = (RowDataDeltaWriter)this.writers.get(this.partitionKey);
    if (writer == null) {
        PartitionKey copiedKey = this.partitionKey.copy();
        writer = new RowDataDeltaWriter(this, copiedKey);
        this.writers.put(copiedKey, writer);
    }

    return writer;
}

// UnpartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {
    return this.writer;
}
public void write(RowData row) throws IOException {
        baseDeltaTaskWriter.RowDataDeltaWriter writer = this.route(row);
        switch(row.getRowKind()) {
        case INSERT:
        case UPDATE_AFTER:
            writer.write(row);
            break;
        case DELETE:
        case UPDATE_BEFORE:
            writer.delete(row);
            break;
        default:
            throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
        }

    }
  1. 未指定字段:

UnpartitionedWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

RowDataPartitionedFanoutWriter调用父类的write() 方法: PartitionedFanoutWriter.write() -> baseRollingWriter.write ()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

以上非分区写和分区写,区别在于开始调用write()时,做些分区处理的工作。PartitionedFanoutWriter会在write()方法中做特殊处理,如下:

public abstract class PartitionedFanoutWriter extends baseTaskWriter {
    private final Map.RollingFileWriter> writers = Maps.newHashMap();

    protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
    }

    protected abstract PartitionKey partition(T var1);

    public void write(T row) throws IOException {
        PartitionKey partitionKey = this.partition(row);
        baseTaskWriter.RollingFileWriter writer = (RollingFileWriter)this.writers.get(partitionKey);
        if (writer == null) {
            PartitionKey copiedKey = partitionKey.copy();
            writer = new RollingFileWriter(this, copiedKey);
            this.writers.put(copiedKey, writer);
        }

        writer.write(row);
    }
}

另:

  1. iceberg自带的PartitionedWriter是一个抽象类,flink用RowDataPartitionedFanoutWriter实现分区写。
  2. 底层调用的appender为IcebergStreamWriter的open()方法中创建 TaskWriter传入的FlinkAppenderFactory创建的FileAppender。
底层调用

写数据会根据file format生成对应的FileAppender,FileAppender完成实际的写文件 *** 作。目前支持3种文件格式的写入:Parquet、Avro以及Orc

public interface FileAppender extends Closeable {
    void add(D var1);

    default void addAll(Iterator values) {
        while(values.hasNext()) {
            this.add(values.next());
        }

    }

    default void addAll(Iterable values) {
        this.addAll(values.iterator());
    }

    Metrics metrics();

    long length();

    default List splitOffsets() {
        return null;
    }
}

DataWriter创建时,传入的FileAppender参数,由FlinkAppenderFactory或GenericAppenderFactory的newAppender()方法生成

public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) {
        MetricsConfig metricsConfig = MetricsConfig.fromProperties(this.config);

        try {
            switch(fileFormat) {
            case AVRO:
                return Avro.write(outputFile).schema(this.schema).createWriterFunc(DataWriter::create).setAll(this.config).overwrite().build();
            case PARQUET:
                return Parquet.write(outputFile).schema(this.schema).createWriterFunc(GenericParquetWriter::buildWriter).setAll(this.config).metricsConfig(metricsConfig).overwrite().build();
            case ORC:
                return ORC.write(outputFile).schema(this.schema).createWriterFunc(GenericOrcWriter::buildWriter).setAll(this.config).metricsConfig(metricsConfig).overwrite().build();
            default:
                throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat);
            }
        } catch (IOException var5) {
            throw new UncheckedIOException(var5);
        }
    }

  1. iceberg分区数据不直接写入数据文件中,而是通过目录树结构来进行存储,分区目录结构与hive类型,都是以key1=value1/key2=value2的形式进行组织。在写入数据之前,partitionWriter首先根据partition transform函数得到对应的partition value,然后创建对应的分区目录
  2. fileAppender通过调用不同的file format组件将数据写入到文件中。iceberg写入时可以通过设置write.target-file-size-bytes table property调整写入文件target大小,默认为LONG_MAX
  3. 当所有数据写入完成后,iceberg会收集写入的统计信息,例如record_count, lower_bound, upper_bound, value_count等用于driver端生成对应的manifest文件,最后executor端将这些信息传回driver端。
实例与过程分析 开始

以下共用字段解释:
1、rowDataStream:flink输入流 ,如下:

DataStream input = … ;

2、Flink_SCHEMA:flink schema,创建如下:

TableSchema.Builder flinkTable = TableSchema.builder();
flinkTable.field(“name”, DataTypes.STRING());
TableSchema Flink_SCHEMA = flinkTable.build();

3、tableLoader:用于加载iceberg表,创建方式如下:

TableLoader tableLoader = TableLoader.fromHadoopTable(“hdfs://nn:8020/warehouse/path”);

实例

1、UnpartitionedDeltaWriter
执行过程图:

如果创建的表不是分区表并且设置参数equalityFieldColumns(),会调用此类UnpartitionedDeltaWriter,实例代码:

FlinkSink.forRow(rowDataStream, Flink_SCHEMA)
                .tableLoader(tableLoader)
                .tableSchema(Flink_SCHEMA)
                .equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh"))
                .writeParallelism(1)
                .build();

build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。

2、PartitionedDeltaWriter
执行过程图:

如果创建的表是分区表并且设置参数equalityFieldColumns(),会调用此类PartitionedDeltaWriter,实例代码:

FlinkSink.forRow(rowDataStream, Flink_SCHEMA)
                .tableLoader(tableLoader)
                .tableSchema(Flink_SCHEMA)
                .equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh"))
                .writeParallelism(1)
                .build();

build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。

以下是route(),会生成分区字段partitionKey和writer,其中包括数据文件和delete文件路径等信息:

3、UnpartitionedWriter
执行过程图:

如果创建的表不是分区表,会调用此类UnpartitionedWriter。
实例代码:

FlinkSink.forRow(rowDataStream, Flink_SCHEMA)
                .tableLoader(tableLoader)
                .tableSchema(Flink_SCHEMA)
                .writeParallelism(1)
                .build();

4、RowDataPartitionedFanoutWriter
执行过程图:

如果创建的表是分区表,会调用此类RowDataPartitionedFanoutWriter。
实例代码:

FlinkSink.forRow(rowDataStream, Flink_SCHEMA)
                .tableLoader(tableLoader)
                .tableSchema(Flink_SCHEMA)
                .writeParallelism(1)
                .build();

另:

使用equalityFieldColumns方法若报错,请参考这里

总结

1、使用equalityFieldColumns写v1表不支持
2、使用equalityFieldColumns写只能flink批读取。

参考:

  1. https://blog.csdn.net/u012794915/article/details/111831471
  2. https://blog.csdn.net/ztx18840831027/article/details/121360709
  3. https://www.iteblog.com/archives/9888.html
  4. https://blog.csdn.net/u012794915/article/details/111642801
  5. https://iceberg.apache.org/#flink/

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5689105.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存