在文章Flink Table/SQL自定义Sources和Sinks全解析(附代码)中我们说到在Flink Table/SQL中如何自定义Sources和Sinks,有了上述文章的基础,我们再来理解Flink Table/SQL是如何实现Hudi的数据读取与写入就比较容易了。
动态表是自定义sources/sinks的核心,通过查阅源码我们可以知道在flink-hudi子模块中,org.apache.hudi.table.HoodieTableFactory类同时实现了DynamicTableSourceFactory和DynamicTableSinkFactory两个接口,该类为提供特定于连接器的逻辑,也就是说当我们在flink sql中指定connector为hudi时,会走该处的逻辑
create table t1 ( col bigint ) with ( connector = "hudi", ... )
HoodieTableFactory类的UML图如下所示
静态变量FACTORY_ID和factoryIdentifier()方法指定了该connector的标志为hudi。
public static final String FACTORY_ID = "hudi"; @Override public String factoryIdentifier() { return FACTORY_ID; }
createDynamicTableSource()函数用于定义读取hudi数据源对应的HoodieTableSource,createDynamicTableSource()函数内容如下:
@Override public DynamicTableSource createDynamicTableSource(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should not be empty."))); return new HoodieTableSource( schema, path, context.getCatalogTable().getPartitionKeys(), conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), conf); }
在createDynamicTableSource函数中,首先使用sanityCheck(conf, schema)对flink sql DDL中的schema和配置参数进行校验;然后使用flink sql DDL设置配置参数,比如说表名,主键等;最后将上一步得到的配置参数传给HoodieTableSource。
HoodieTableSource实现了ScanTableSource、SupportsPartitionPushDown、SupportsProjectionPushDown、SupportsLimitPushDown以及SupportsFilterPushDown。定义了读取Hudi表的方式,我们在使用flink读取hudi数据的时候,配置流读、批量读取、从某个时间点读取则是在此进行配置。
首先我们分析一下流读的场景,也就是在flink sql DDL中指定read.streaming.enabled = true。
我们来看一下getScanRuntimeProvider函数,
@Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { return new DataStreamScanProvider() { @Override public boolean isBounded() { return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING); } @Override public DataStreamproduceDataStream(StreamExecutionEnvironment execEnv) { @SuppressWarnings("unchecked") TypeInformation typeInfo = (TypeInformation ) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); } } }; }
当指定read.streaming.enabled = true时,该函数走流读逻辑,也就是
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormatinputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source);
在上述流读逻辑中,StreamReadMonitoringFunction类扩展了RichSourceFunction和实现了CheckpointedFunction,该类指定了hudi数据源读取。我们再来看一下StreamReadMonitoringFunction类中的monitorDirAndForwardSplits方法,该方法具体执行了hudi数据源的读取方式。其方式为增量分片读取。
@Override public void run(SourceFunction.SourceContextcontext) throws Exception { checkpointLock = context.getCheckpointLock(); while (isRunning) { synchronized (checkpointLock) { monitorDirAndForwardSplits(context); } TimeUnit.SECONDS.sleep(interval); } } @VisibleForTesting public void monitorDirAndForwardSplits(SourceContext context) { HoodieTablemetaClient metaClient = getOrCreatemetaClient(); if (metaClient == null) { // table does not exist return; } IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); if (result.isEmpty()) { // no new instants, returns early return; } for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); } // update the issues instant time this.issuedInstant = result.getEndInstant(); LOG.info("n" + "------------------------------------------------------------n" + "---------- consumed to instant: {}n" + "------------------------------------------------------------", this.issuedInstant); }
数据读取的时候也受checkpoint的影响,假如处于checkpoint,那么会停止读取直到chk结束,chk表示将当前读取的位置记录到状态中。由此我们可以知道当任务失败时,从上次chk点重启便可以从上次读取位置继续读取数据。
我们在定义hudi数据读取时,还可以定义read.start-commit,从某个commit开始消费数据,这些配置参数具体在这里进行处理
IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
具体函数如下:
public Result inputSplits( HoodieTablemetaClient metaClient, org.apache.hadoop.conf.Configuration hadoopConf, String issuedInstant) { metaClient.reloadActiveTimeline(); HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); if (commitTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return Result.EMPTY; } Listinstants = filterInstantsWithRange(commitTimeline, issuedInstant); // get the latest instant that satisfies condition final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); final InstantRange instantRange; if (instantToIssue != null) { if (issuedInstant != null) { // the streaming reader may record the last issued instant, if the issued instant is present, // the instant range should be: (issued instant, the latest instant]. instantRange = InstantRange.getInstance(issuedInstant, instantToIssue.getTimestamp(), InstantRange.RangeType.OPEN_CLOSE); } else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) { // first time consume and has a start commit final String startCommit = this.conf.getString(FlinkOptions.READ_START_COMMIT); instantRange = startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST) ? null : InstantRange.getInstance(startCommit, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } else { // first time consume and no start commit, consumes the latest incremental data set. instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } } else { LOG.info("No new instant found for the table under path " + path + ", skip reading"); return Result.EMPTY; } String tableName = conf.getString(FlinkOptions.TABLE_NAME); Set writePartitions; final FileStatus[] fileStatuses; if (instantRange == null) { // reading from the earliest, scans the partitions and files directly. FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf); if (this.requiredPartitions != null) { // apply partition push down fileIndex.setPartitionPaths(this.requiredPartitions); } writePartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths()); if (writePartitions.size() == 0) { LOG.warn("No partitions found for reading in user provided path."); return Result.EMPTY; } fileStatuses = fileIndex.getFilesInPartitions(); } else { List activemetadataList = instants.stream() .map(instant -> WriteProfiles.getCommitmetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); List archivedmetadataList = getArchivedmetadata(metaClient, instantRange, commitTimeline, tableName); if (archivedmetadataList.size() > 0) { LOG.warn("n" + "--------------------------------------------------------------------------------n" + "---------- caution: the reader has fall behind too much from the writer,n" + "---------- tweak 'read.tasks' option to add parallelism of read tasks.n" + "--------------------------------------------------------------------------------"); } List metadataList = archivedmetadataList.size() > 0 // importANT: the merged metadata list must be in ascending order by instant time ? mergeList(archivedmetadataList, activemetadataList) : activemetadataList; writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList); // apply partition push down if (this.requiredPartitions != null) { writePartitions = writePartitions.stream() .filter(this.requiredPartitions::contains).collect(Collectors.toSet()); } if (writePartitions.size() == 0) { LOG.warn("No partitions found for reading in user provided path."); return Result.EMPTY; } fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType()); } if (fileStatuses.length == 0) { LOG.warn("No files found for reading in user provided path."); return Result.EMPTY; } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses); final String endInstant = instantToIssue.getTimestamp(); final AtomicInteger cnt = new AtomicInteger(0); final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); List inputSplits = writePartitions.stream() .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant) .map(fileSlice -> { Option > logPaths = Option.ofNullable(fileSlice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); String basePath = fileSlice.getbaseFile().map(baseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, endInstant, metaClient.getbasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); }).collect(Collectors.toList())) .flatMap(Collection::stream) .collect(Collectors.toList()); return Result.instance(inputSplits, endInstant); }
当然,针对hudi数据源的读取方式是有很多种方式的,上述分析仅仅针对流读的简单分析,不过相信通过上述分析,也能够掌握其他读取方式的代码分析方法,并对问题分析起到一定的帮助。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)