- 开始
- 继承关系
- 关系图
- 调用流程
- 底层调用
- 实例与过程分析
- 开始
- 实例
- 总结
flink写iceberg时,IcebergStreamWriter的open()方法中,会调用TaskWriterFactory.create(),会创建四种类型的写(UnpartitionedDeltaWriter/UnpartitionedWriter/PartitionedDeltaWriter/RowDataPartitionedFanoutWriter),本文主要追踪这四种类型的写。
其中,IcebergStreamWriter.open()方法:
public void open() { this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask(); this.attemptId = this.getRuntimeContext().getAttemptNumber(); this.taskWriterFactory.initialize(this.subTaskId, this.attemptId); this.writer = this.taskWriterFactory.create(); }
TaskWriterFactory.create()方法:
public TaskWriter继承关系create() { Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds)); } else { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema)); } }
此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
关系图
从图中可以看出,几种类型的写均继承自baseTaskWriter抽象类。区别在于 Partitioned方式的写需要处理一些分区 Key 生成的逻辑。
其中:
- TaskWriter/baseTaskWriter/UnpartitionedWriter/PartitionedWriter/RowDataPartitionedFanoutWriter均在org.apache.iceberg.io 这个包,这里面的类或接口都是在 iceberg-core 模块中,这里面定义了 Iceberg 写数据的公共逻辑;
- PartitionedDeltaWriter和UnpartitionedDeltaWriter是在org.apache.iceberg.flink.sink包中,flink connector模块实现的。
以上类均实现TaskWriter接口:
public interface TaskWriterextends Closeable { void write(T var1) throws IOException; void abort() throws IOException; default DataFile[] dataFiles() throws IOException { WriteResult result = this.complete(); Preconditions.checkArgument(result.deleteFiles() == null || result.deleteFiles().length == 0, "Should have no delete files in this write result."); return result.dataFiles(); } WriteResult complete() throws IOException; }
baseTaskWriter继承自TaskWriter,内部类RollingFileWriter(以上几种最终调用的RollingFileWriter.write())继承自内部类baseRollingWriter,创建baseRollingWriter时,调用openCurrent(),会生成DataWriter。baseRollingWriter的write()方法会调用datawriter.add(),最后调用FileAppender的add()方法进行数据的写入工作。
private abstract class baseRollingWriter调用流程implements Closeable { private static final int ROWS_DIVISOR = 1000; private final PartitionKey partitionKey; private EncryptedOutputFile currentFile; private W currentWriter; private long currentRows; private baseRollingWriter(PartitionKey partitionKey) { this.currentFile = null; this.currentWriter = null; this.currentRows = 0L; this.partitionKey = partitionKey; this.openCurrent(); } abstract W newWriter(EncryptedOutputFile var1, PartitionKey var2); abstract long length(W var1); abstract void write(W var1, T var2); abstract void complete(W var1); public void write(T record) throws IOException { this.write(this.currentWriter, record); ++this.currentRows; if (this.shouldRollTonewFile()) { this.closeCurrent(); this.openCurrent(); } } private void openCurrent() { if (this.partitionKey == null) { this.currentFile = baseTaskWriter.this.fileFactory.newOutputFile(); } else { this.currentFile = baseTaskWriter.this.fileFactory.newOutputFile(this.partitionKey); } this.currentWriter = this.newWriter(this.currentFile, this.partitionKey); this.currentRows = 0L; } }
以下是以上类做写 *** 作的调用流程
- 指定字段:
UnpartitionedDeltaWriter 调用父类的write()方法: baseDeltaTaskWriter.write() -> baseEqualityDeltaWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
PartitionedDeltaWriter 调用父类的write()方法: baseDeltaTaskWriter.write() -> baseEqualityDeltaWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
两种方式均调用baseDeltaTaskWriter的write()方法,区别在于route()的实现不同:
abstract baseDeltaTaskWriter.RowDataDeltaWriter route(RowData var1);
// PartitionedDeltaWriter RowDataDeltaWriter route(RowData row) { this.partitionKey.partition(this.wrapper().wrap(row)); RowDataDeltaWriter writer = (RowDataDeltaWriter)this.writers.get(this.partitionKey); if (writer == null) { PartitionKey copiedKey = this.partitionKey.copy(); writer = new RowDataDeltaWriter(this, copiedKey); this.writers.put(copiedKey, writer); } return writer; } // UnpartitionedDeltaWriter RowDataDeltaWriter route(RowData row) { return this.writer; }
public void write(RowData row) throws IOException { baseDeltaTaskWriter.RowDataDeltaWriter writer = this.route(row); switch(row.getRowKind()) { case INSERT: case UPDATE_AFTER: writer.write(row); break; case DELETE: case UPDATE_BEFORE: writer.delete(row); break; default: throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); } }
- 未指定字段:
UnpartitionedWriter.write() -> baseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
RowDataPartitionedFanoutWriter调用父类的write() 方法: PartitionedFanoutWriter.write() -> baseRollingWriter.write ()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
以上非分区写和分区写,区别在于开始调用write()时,做些分区处理的工作。PartitionedFanoutWriter会在write()方法中做特殊处理,如下:
public abstract class PartitionedFanoutWriterextends baseTaskWriter { private final Map .RollingFileWriter> writers = Maps.newHashMap(); protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); } protected abstract PartitionKey partition(T var1); public void write(T row) throws IOException { PartitionKey partitionKey = this.partition(row); baseTaskWriter .RollingFileWriter writer = (RollingFileWriter)this.writers.get(partitionKey); if (writer == null) { PartitionKey copiedKey = partitionKey.copy(); writer = new RollingFileWriter(this, copiedKey); this.writers.put(copiedKey, writer); } writer.write(row); } }
另:
- iceberg自带的PartitionedWriter是一个抽象类,flink用RowDataPartitionedFanoutWriter实现分区写。
- 底层调用的appender为IcebergStreamWriter的open()方法中创建 TaskWriter传入的FlinkAppenderFactory创建的FileAppender。
写数据会根据file format生成对应的FileAppender,FileAppender完成实际的写文件 *** 作。目前支持3种文件格式的写入:Parquet、Avro以及Orc
public interface FileAppenderextends Closeable { void add(D var1); default void addAll(Iterator values) { while(values.hasNext()) { this.add(values.next()); } } default void addAll(Iterable values) { this.addAll(values.iterator()); } Metrics metrics(); long length(); default List splitOffsets() { return null; } }
DataWriter创建时,传入的FileAppender参数,由FlinkAppenderFactory或GenericAppenderFactory的newAppender()方法生成
public FileAppendernewAppender(OutputFile outputFile, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(this.config); try { switch(fileFormat) { case AVRO: return Avro.write(outputFile).schema(this.schema).createWriterFunc(DataWriter::create).setAll(this.config).overwrite().build(); case PARQUET: return Parquet.write(outputFile).schema(this.schema).createWriterFunc(GenericParquetWriter::buildWriter).setAll(this.config).metricsConfig(metricsConfig).overwrite().build(); case ORC: return ORC.write(outputFile).schema(this.schema).createWriterFunc(GenericOrcWriter::buildWriter).setAll(this.config).metricsConfig(metricsConfig).overwrite().build(); default: throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat); } } catch (IOException var5) { throw new UncheckedIOException(var5); } }
- iceberg分区数据不直接写入数据文件中,而是通过目录树结构来进行存储,分区目录结构与hive类型,都是以key1=value1/key2=value2的形式进行组织。在写入数据之前,partitionWriter首先根据partition transform函数得到对应的partition value,然后创建对应的分区目录
- fileAppender通过调用不同的file format组件将数据写入到文件中。iceberg写入时可以通过设置write.target-file-size-bytes table property调整写入文件target大小,默认为LONG_MAX
- 当所有数据写入完成后,iceberg会收集写入的统计信息,例如record_count, lower_bound, upper_bound, value_count等用于driver端生成对应的manifest文件,最后executor端将这些信息传回driver端。
以下共用字段解释:
1、rowDataStream:flink输入流 ,如下:
DataStream input = … ;
2、Flink_SCHEMA:flink schema,创建如下:
TableSchema.Builder flinkTable = TableSchema.builder();
flinkTable.field(“name”, DataTypes.STRING());
TableSchema Flink_SCHEMA = flinkTable.build();
3、tableLoader:用于加载iceberg表,创建方式如下:
实例TableLoader tableLoader = TableLoader.fromHadoopTable(“hdfs://nn:8020/warehouse/path”);
1、UnpartitionedDeltaWriter
执行过程图:
如果创建的表不是分区表并且设置参数equalityFieldColumns(),会调用此类UnpartitionedDeltaWriter,实例代码:
FlinkSink.forRow(rowDataStream, Flink_SCHEMA) .tableLoader(tableLoader) .tableSchema(Flink_SCHEMA) .equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")) .writeParallelism(1) .build();
build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。
2、PartitionedDeltaWriter
执行过程图:
如果创建的表是分区表并且设置参数equalityFieldColumns(),会调用此类PartitionedDeltaWriter,实例代码:
FlinkSink.forRow(rowDataStream, Flink_SCHEMA) .tableLoader(tableLoader) .tableSchema(Flink_SCHEMA) .equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")) .writeParallelism(1) .build();
build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。
以下是route(),会生成分区字段partitionKey和writer,其中包括数据文件和delete文件路径等信息:
3、UnpartitionedWriter
执行过程图:
如果创建的表不是分区表,会调用此类UnpartitionedWriter。
实例代码:
FlinkSink.forRow(rowDataStream, Flink_SCHEMA) .tableLoader(tableLoader) .tableSchema(Flink_SCHEMA) .writeParallelism(1) .build();
4、RowDataPartitionedFanoutWriter
执行过程图:
如果创建的表是分区表,会调用此类RowDataPartitionedFanoutWriter。
实例代码:
FlinkSink.forRow(rowDataStream, Flink_SCHEMA) .tableLoader(tableLoader) .tableSchema(Flink_SCHEMA) .writeParallelism(1) .build();
另:
使用equalityFieldColumns方法若报错,请参考这里
总结1、使用equalityFieldColumns写v1表不支持
2、使用equalityFieldColumns写只能flink批读取。
参考:
- https://blog.csdn.net/u012794915/article/details/111831471
- https://blog.csdn.net/ztx18840831027/article/details/121360709
- https://www.iteblog.com/archives/9888.html
- https://blog.csdn.net/u012794915/article/details/111642801
- https://iceberg.apache.org/#flink/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)