数据血缘(data lineage)是数据治理(data governance)的重要组成部分,也是元数据管理、数据质量管理的有力工具。通俗地讲,数据血缘就是数据在产生、加工、流转到最终消费过程中形成的有层次的、可溯源的联系。成熟的数据血缘系统可以帮助开发者快速定位问题,以及追踪数据的更改,确定上下游的影响等等。
在数据仓库的场景下,数据的载体是数据库中的表和列(字段),相应地,数据血缘根据粒度也可以分为较粗的表级血缘和较细的列(字段)级血缘。离线数仓的数据血缘提取已经有了成熟的方法,如利用Hive提供的LineageLogger与Execution Hooks机制。本文就来简要介绍一种在实时数仓中基于Calcite解析Flink SQL列级血缘的方法,在此之前,先用几句话聊聊Calcite的关系式元数据体系。
Calcite关系式元数据在Calcite内部,库表元数据由Catalog来处理,关系式元数据才会被冠以[Rel]metadata的名称。关系式元数据与RelNode对应,以下是与其相关的Calcite组件:
- RelmetadataQuery:为关系式元数据提供统一的访问接口;
- RelmetadataProvider:为RelmetadataQuery各接口提供实现的中间层;
- metadataFactory:生产并维护RelmetadataProvider的工厂;
- metadataHandler:处理关系式元数据的具体实现逻辑,全部位于org.apache.calcite.rel.metadata包下,且类名均以RelMd作为前缀。
Calcite内置了许多种默认的关系式元数据实现,并以接口的形式统一维护在BuiltInmetadata抽象类里,如下图所示,名称都比较直白(如RowCount就表示该RelNode查询结果的行数)。
其中,ColumnOrigin.Handler就是负责解析列级血缘的metadataHandler,对各类RelNode分别定义了相应的寻找起源列的方法,其结构如下图所示。具体源码会另外写文章专门讲解,本文先不提。
处理`Snapshot` RelNode的方法是笔者新增的注意包括ColumnOrigin.Handler在内的绝大多数metadataHandler都是靠ReflectiveRelmetadataProvider来发挥作用。顾名思义,ReflectiveRelmetadataProvider通过反射取得各个metadataHandler中的方法,并在内部维护RelNode具体类型和通过Java Proxy生成的metadata代理对象(其中包含Handler方法)的映射。这样,通过RelmetadataQuery获取关系式元数据时,用户的请求就可以根据RelNode类型正确地dispatch到对应的方法上去。
另外,还有少数metadataHandler(如CumulativeCost/NonCumulativeCost对应的Handlers)在Calcite工程里找不到具体的实现。它们的代码是运行时生成的,并由JaninoRelmetadataProvider做动态编译。关于代码生成和Janino也在计划中,暂不赘述。
当然实际应用时我们不需要了解这些细节,只需要与RelmetadataQuery打交道。下面就来看看如何通过它取得我们想要的Flink SQL列血缘。
解析Flink SQL列级血缘以Flink SQL任务中最为常见的单条INSERT INTO ... SELECt ...为例,首先我们需要取得SQL语句生成的RelNode对象,即逻辑计划树。
为了方便讲解,这里笔者简单粗暴地在o.a.f.table.api.internal.TableEnvironmentImpl类中定义了一个getInsertOperation()方法。它负责解析、验证SQL语句,生成CatalogSinkModifyOperation,并取得它的PlannerQueryOperation子节点(即SELECT *** 作)。代码如下。
public Tuple3, QueryOperation> getInsertOperation(String insertStmt) { List operations = getParser().parse(insertStmt); if (operations.size() != 1) { throw new TableException( "Unsupported SQL query! getInsertOperation() only accepts a single INSERT statement."); } Operation operation = operations.get(0); if (operation instanceof CatalogSinkModifyOperation) { CatalogSinkModifyOperation sinkOperation = (CatalogSinkModifyOperation) operation; QueryOperation queryOperation = sinkOperation.getChild(); return new Tuple3<>( sinkOperation.getTableIdentifier().asSummaryString(), sinkOperation.getDynamicOptions(), queryOperation); } else { throw new TableException("only INSERT is supported now."); } }
接下来就能够取得Sink的表名以及对应的RelNode根节点。示例SQL来自之前的<
val tableEnv = StreamTableEnvironment.create(streamEnv, EnvironmentSettings.newInstance().build()) val sql = s""" |INSERT INTO tmp.print_joined_result |SELECt FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, a.userId, a.eventType, a.siteId, b.site_name AS siteName |FROM rtdw_ods.kafka_analytics_access_log_app a |LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.main_site_id |WHERe a.userId > 7 |""".stripMargin val insertOp = tableEnv.asInstanceOf[TableEnvironmentImpl].getInsertOperation(sql) val tableName = insertOp.f0 val relNode = insertOp.f2.asInstanceOf[PlannerQueryOperation].getCalciteTree
然后对取得的RelNode进行逻辑优化,即执行之前所讲过的FlinkStreamProgram,但仅执行到LOGICAL_REWRITE阶段为止。我们在本地将FlinkStreamProgram复制一份,并删去PHYSICAL和PHYSICAL_REWRITE两个阶段,即:
object FlinkStreamProgramLogicalonly { val SUBQUERY_REWRITE = "subquery_rewrite" val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite" val DECORRELATE = "decorrelate" val TIME_INDICATOR = "time_indicator" val DEFAULT_REWRITE = "default_rewrite" val PREDICATE_PUSHDOWN = "predicate_pushdown" val JOIN_REORDER = "join_reorder" val PROJECT_REWRITE = "project_rewrite" val LOGICAL = "logical" val LOGICAL_REWRITE = "logical_rewrite" def buildProgram(config: Configuration): FlinkChainedProgram[StreamOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]() // rewrite sub-queries to joins chainedProgram.addLast( SUBQUERY_REWRITE, FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext] // rewrite QueryOperationCatalogViewTable before rewriting sub-queries .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.TABLE_REF_RULES) .build(), "convert table references before rewriting sub-queries to semi-join") .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.SEMI_JOIN_RULES) .build(), "rewrite sub-queries to semi-join") .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES) .build(), "sub-queries remove") // convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.TABLE_REF_RULES) .build(), "convert table references after sub-queries removed") .build()) // rewrite special temporal join plan // ... // query decorrelation // ... // convert time indicators // ... // default rewrite, includes: predicate simplification, expression reduction, window // properties rewrite, etc. // ... // rule based optimization: push down predicate(s) in where clause, so it only needs to read // the required data // ... // join reorder // ... // project rewrite // ... // optimize the logical plan chainedProgram.addLast( LOGICAL, FlinkVolcanoProgramBuilder.newBuilder .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES) .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL)) .build()) // logical rewrite chainedProgram.addLast( LOGICAL_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkStreamRuleSets.LOGICAL_REWRITE) .build()) chainedProgram } }
执行FlinkStreamProgramLogicalOnly即可。注意StreamOptimizeContext内需要传入的上下文信息,通过各种workaround取得(FunctionCatalog可以在TableEnvironmentImpl内增加一个Getter拿到)。
val logicalProgram = FlinkStreamProgramLogicalOnly.buildProgram(tableEnvConfig) val optRelNode = logicalProgram.optimize(relNode, new StreamOptimizeContext { override def getTableConfig: TableConfig = tableEnv.getConfig override def getFunctionCatalog: FunctionCatalog = tableEnv.asInstanceOf[TableEnvironmentImpl].getFunctionCatalog override def getCatalogManager: CatalogManager = tableEnv.asInstanceOf[TableEnvironmentImpl].getCatalogManager override def getRexBuilder: RexBuilder = relNode.getCluster.getRexBuilder override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getSqlExprToRexConverterFactory override def isUpdateBeforeRequired: Boolean = false override def needFinalTimeIndicatorConversion: Boolean = true override def getMiniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE })
对比一下优化前与优化后的RelNode:
--- Original RelNode --- LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$3], eventType=[$4], siteId=[$8], siteName=[$46]) LogicalFilter(condition=[>($3, 7)]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{8, 44}]) LogicalProject(ts=[$0], tss=[$1], tssDay=[$2], userId=[$3], eventType=[$4], columnType=[$5], fromType=[$6], grouponId=[$7], , procTime=[PROCTIME()]) LogicalTableScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]]) LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $8)]) LogicalSnapshot(period=[$cor0.procTime]) LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]]) --- Optimized RelNode --- FlinkLogicalCalc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName]) FlinkLogicalJoin(condition=[=($4, $6)], joinType=[left]) FlinkLogicalCalc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)]) FlinkLogicalTableSourceScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], fields=[ts, tss, tssDay, userId, eventType, columnType, fromType, grouponId, , latitude, longitude], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]]) FlinkLogicalSnapshot(period=[$cor0.procTime]) FlinkLogicalCalc(select=[site_name, main_site_id]) FlinkLogicalTableSourceScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]], fields=[site_id, site_name, site_city_id, ])
这里需要注意两个问题。
其一,Calcite中RelMdColumnOrigins这个Handler类里并没有处理Snapshot类型的RelNode,走fallback逻辑则会对所有非叶子节点的RelNode返回空,所以默认情况下是拿不到Lookup Join字段的血缘关系的。我们还需要修改它的源码,在遇到Snapshot时继续深搜:
public SetgetColumnOrigins(Snapshot rel, RelmetadataQuery mq, int iOutputColumn) { return mq.getColumnOrigins(rel.getInput(), iOutputColumn); }
其二,Flink使用的Calcite版本为1.26,但是该版本不会追踪派生列(isDerived == true,例如SUM(col))的血缘。1.27版本修复了此问题,为避免大版本不兼容,可以将对应的issue CALCITE-4251 cherry-pick到内部的Calcite 1.26分支上来。当然别忘了重新编译Calcite Core和Flink Table模块。
最后就可以通过RelmetadataQuery取得结果表中字段的起源列了。So easy.
val metadataQuery = optRelNode.getCluster.getmetadataQuery for (i <- 0 to 4) { val origins = metadataQuery.getColumnOrigins(optRelNode, i) if (origins != null) { for (rco <- origins) { val table = rco.getOriginTable val tableName = table.getQualifiedName.mkString(".") val ordinal = rco.getOriginColumnOrdinal val fields = table.getRowType.getFieldNames println(Seq(tableName, ordinal, fields.get(ordinal)).mkString("t")) } } else { println("NULL") } }
上面例子中的SQL语句比较简单,因此产生的ColumnOrigin也只有单列。看官可自行用多表JOIN或者有聚合逻辑的SQL来测试,多列ColumnOrigin的情况下也很好用,免去了自行折腾RelVisitor或者RelShuttle的许多麻烦。
最后的血缘可视化这一步,普遍采用Neo4j、JanusGraph等图数据库承载并展示列血缘关系的数据。笔者也正在探索将Flink SQL列级血缘集成到Atlas的方法,进度比较慢,期望值请勿太高。
The End博客荒废良久,惊动大佬出面催更,惭愧惭愧。
受疫情影响,FFA 2021转为线上,不能面基真可惜(
炒鸡感谢会务组发来的大礼包~
也欢迎大家届时光临本鶸的presentation~
民那晚安晚安。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)