Flink SQL·validate

Flink SQL·validate,第1张

Flink SQL·validate 核心概念 SqlValidator

验证 SQL 语句的解析树,并提供有关解析树的语义信息。

初始化实例: SqlValidatorUtil#newValidator采用访问者模式执行validate。比如
SqlLiteral#validate(SqlValidator, SqlValidatorScope) 调用 #validateLiteral(org.apache.calcite.sql.SqlLiteral)
SqlCall#validate(SqlValidator, SqlValidatorScope) 调用 #validateCall(SqlCall, SqlValidatorScope)为了将name解析为对象,验证器构建查询结构的Map。该Map由两种类型的对象组成。 SqlValidatorScope:描述了在查询中的特定点可访问的表和列; SqlValidatorNamespace是对查询中使用的数据源的描述。查询的不同部分有不同种类的命名空间。例如,IdentifierNamespace用于表名,SelectNamespace用于 SELECT 查询,SetopNamespace用于 UNIOn、EXCEPT 和 INTERSECT。验证器可以将namespace包装在其他实现SqlValidatorNamespace的对象中,所以不要尝试强制转换你的namespace或使用instanceof;使用SqlValidatorNamespace#unwrap(Class)和SqlValidatorNamespace#isWrapperFor(Class)代替。验证器第一次会扫描root SqlNode构建一个map。此后,它在调用验证方法时提供正确的scope或namespace对象,常用的获取scope的方法:

getSelectScopegetFromScopegetWhereScopegetGroupScopegetHavingScopegetOrderScopegetJoinScope

具体继承类如下:

SqlValidator (org.apache.calcite.sql.validate)
    SqlValidatorWithHints (org.apache.calcite.sql.validate)
        SqlValidatorImpl (org.apache.calcite.sql.validate)
            FarragoTestValidator in SqlToRelTestbase (org.apache.calcite.test)
            SqlAdvisorValidator (org.apache.calcite.sql.advise)
            CalciteSqlValidator (org.apache.calcite.prepare)
            ValidatorForTest in AbstractMaterializedViewTest (org.apache.calcite.test)
            Featurevalidator in SqlValidatorFeatureTest (org.apache.calcite.test)
            ContextSqlValidator (org.apache.calcite.jdbc)
            Anonymous in testRewriteExpansionOfColumnReferenceBeforeResolution() in SqlValidatorTest (org.apache.calcite.test)
SqlValidatorImpl

SqlValidator接口的实现。

SqlValidatorScope

SQL校验名称解析范围。代表了在某一个程序运行点,当前可见的字段名和表名。

具体继承类如下:

SqlValidatorScope (org.apache.calcite.sql.validate)                   校验的scope
    AggregatingScope (org.apache.calcite.sql.validate)                聚合scope
        AggregatingSelectScope (org.apache.calcite.sql.validate)    select聚合scope
    DelegatingScope (org.apache.calcite.sql.validate)                  将所有的 *** 作委派给父类,继承类为各个类型的SQL的Scope
        CatalogScope (org.apache.calcite.sql.validate)
        OrderByScope (org.apache.calcite.sql.validate)
        ListScope (org.apache.calcite.sql.validate)
            JoinScope (org.apache.calcite.sql.validate)
            MatchRecognizeScope (org.apache.calcite.sql.validate)
            CollectScope (org.apache.calcite.sql.validate)
            TableScope (org.apache.calcite.sql.validate)
            UnpivotScope (org.apache.calcite.sql.validate)
            OverScope (org.apache.calcite.sql.validate)
            PivotScope (org.apache.calcite.sql.validate)
            WithScope (org.apache.calcite.sql.validate)
            SelectScope (org.apache.calcite.sql.validate)
            Anonymous in convertMultisets() in SqlToRelConverter (org.apache.calcite.sql2rel)
        AggregatingSelectScope (org.apache.calcite.sql.validate)
        GroupByScope (org.apache.calcite.sql.validate)
    EmptyScope (org.apache.calcite.sql.validate)                      为了防止父类为空,生成一个EmptyScope
        ParameterScope (org.apache.calcite.sql.validate)

下图为SelectScope的对象实例:

SelectScope

SqlValidatorNamespace

SQL校验命名空间。描述了 SQL 查询返回的关系,一个 SQL 查询可以拆分为多个部分,当中每个部分都有一个对应的 SqlValidatorNamespace。它是一个逻辑上数据源,可以是一张表,也可以是一个子查询。会存储查询的返回类型(通过setType方法写入)

具体继承类如下:

SqlValidatorNamespace (org.apache.calcite.sql.validate)
    DelegatingNamespace (org.apache.calcite.sql.validate)
    AbstractNamespace (org.apache.calcite.sql.validate)
        IdentifierNamespace (org.apache.calcite.sql.validate)
            DmlNamespace in SqlValidatorImpl (org.apache.calcite.sql.validate)
        UnnestNamespace (org.apache.calcite.sql.validate)
        JoinNamespace (org.apache.calcite.sql.validate)
        FieldNamespace (org.apache.calcite.sql.validate)
        PivotNamespace (org.apache.calcite.sql.validate)
        ParameterNamespace (org.apache.calcite.sql.validate)
        TableConstructorNamespace (org.apache.calcite.sql.validate)
        MatchRecognizeNamespace (org.apache.calcite.sql.validate)
        SetopNamespace (org.apache.calcite.sql.validate)
        AliasNamespace (org.apache.calcite.sql.validate)
        WithNamespace (org.apache.calcite.sql.validate)
        TableNamespace (org.apache.calcite.sql.validate)
        CollectNamespace (org.apache.calcite.sql.validate)
        SchemaNamespace (org.apache.calcite.sql.validate)
        UnpivotNamespace (org.apache.calcite.sql.validate)
        SelectNamespace (org.apache.calcite.sql.validate)
        ProcedureNamespace (org.apache.calcite.sql.validate)
        WithItemNamespace (org.apache.calcite.sql.validate)

下图为SelectNamespace的对象实例(已经校验完成,此时type字段已经被赋值):

SelectNamespace

代码流程

整体验证时序图如下:

validate时序图

validate()
  @Override public SqlNode validate(SqlNode topNode) {
    // 1. root的scope
    SqlValidatorScope scope = new EmptyScope(this);
    scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
    // 2. ☆语法检查,校验逻辑都在这个函数里
    final SqlNode topNode2 = validateScopedexpression(topNode, scope);
    // 3. 获取节点类型
    final RelDataType type = getValidatedNodeType(topNode2);
    // 4. 取消编译期告警
    Util.discard(type);
    return topNode2;
  }
validateScopedexpression()
  private SqlNode validateScopedexpression(
      SqlNode topNode,
      SqlValidatorScope scope) {
    // 1. sql标准化,将非标准的sql转换为标准sql,以便后续做校验和优化
    // SqlOrderBy -> SqlSelect
    // SqlDelete -> SqlSelect
    // SqlMerge -> SqlSelect
    // SqlUpdate -> SqlSelect
    // VALUES函数 -> SqlSelect
    // explicit table类似'select * from (TABLE t)'-> SqlSelect
    SqlNode outermostNode = performUnconditionalRewrites(topNode, false);
    cursorSet.add(outermostNode);
    top = outermostNode;
    TRACER.trace("After unconditional rewrite: {}", outermostNode);
    if (outermostNode.isA(SqlKind.TOP_LEVEL)) {
      // 2. 注册scope和namespace
      registerQuery(scope, null, outermostNode, outermostNode, null, false);
    }
    // 3. ☆开始校验,调用sqlNode的validate方法,因为使用访问者模式,最终还会调用到该类的方法。
    outermostNode.validate(this, scope);
    if (!outermostNode.isA(SqlKind.TOP_LEVEL)) {
      // 4. 推断类型
      deriveType(scope, outermostNode);
    }
    TRACER.trace("After validation: {}", outermostNode);
    return outermostNode;
  }
registerQuery()

注册scope和namespace,执行完成后的对象如下图:

scope & namesapce

outermostNode.validate()

校验开始的核心逻辑,最终会调用到SqlValidatorImpl的validateSelect方法。调用链路可以参考上面的调用时序图。

validator.validateSelect()

对select进行相关验证,重点逻辑:

基于scope对数据类型、udf等进行校验。可能会修改sql,比如select * from t,会基于上下文将*展开为相应字段。validateFrom时会递归验证到最底层的子查询。验证完成后会得到各个子查询的类型信息,并通过namespace.setType()方法set到相应的namesapce中。

对select执行的验证方法有:

validateFeature()
validateFrom()
validateWhereClause()
validateGroupClause()
validateHavingClause()
validateWindowClause()
validateSelectList()
validateOrderList()
其他 本文示例使用代码
public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        tableEnv.executeSql("CREATE TABLE Products (n" +
            "    id         INTn" +
            "    ,seller     INTn" +
            "    ,name       STRINGn" +
            "    ,price      INTn" +
            ") WITH (n" +
            "  'connector' = 'filesystem',n" +
            "  'path'='file:///Users/zart/data/flink/product_simple',n" +
            "  'format'='csv'n" +
            ")");

        tableEnv.executeSql("CREATE TABLE ProductsResult (n" +
            "    id              INTn" +
            "    ,name           STRINGn" +
            "    ,price          INTn" +
            "    ,test_name      STRINGn" +
            ") WITH (n" +
            "  'connector' = 'filesystem',n" +
            "  'path'='file:///Users/zart/data/flink/product_result',n" +
            "  'format'='csv'n" +
            ")");

        tableEnv.createTemporaryFunction("string_test_udf", new StringToStringTestUdf());

        String sql = "INSERT INTO ProductsResult n"
            + "    SELECT a.*, b.price, string_test_udf(a.*) AS test_name n"
            + "    FROM n"
            + "        (SELECt id, name FROM Products) a n"
            + "    LEFT JOIN n"
            + "        (SELECt id, price FROM Products) b n"
            + "    ON a.id=b.id";
        TableResult result = tableEnv.executeSql(sql);
        result.print();
    }
代码堆栈
ParserImpl#parse    1. sql->SqlNode  2. SqlNode->Operation    insert into ProductsResult select a.*, b.price, string_test_udf(a.*) as test_name from (select id, name Products) a left join (select id, price Products) b on a.id=b.id
    CalciteParser#parse    sql->SqlNode
    SqlToOperationConverter#convert   SqlNode->Operation
        FlinkPlannerImpl#validate   SqlKind为insert,直接返回sqlNode
        SqlToOperationConverter#new SqlToOperationConverter
        SqlToOperationConverter#convertSqlInsert
            SqlToOperationConverter#convert   第二次进入covert,此时的sqlNode为SqlSelect (insert.getSource())
                FlinkPlannerImpl#validate   验证开始,新建SqlValidatorScope,并传给validateScopedexpression函数作为参数
                    FlinkPlannerImpl#getOrCreateSqlValidator
                    FlinkPlannerImpl#validate(sqlNode, validator) 将sqlNode和validator作为参数传入validate函数开始校验。
                        FlinkPlannerImpl#validater.validate(sqlNode)   ⭐️真正开始validate
                            SqlValidatorImpl#scope = new CatalogScope
                            SqlValidatorImpl#validateScopedexpression(topNode, scope)
                                SqlValidatorImpl#performUnconditionalRewrites 执行始终无条件使用的表达式重写。 这些将表达式树重写为标准形式,以便验证逻辑的其余部分可以更简单。
                                SqlValidatorImpl#registerQuery   注册scope和namespace,只针对TOP_LEVEL
                                    SqlValidatorImpl#SqlSelect select = (SqlSelect) node
                                    SqlValidatorImpl#createSelectNamespace(select, enclosingNode)
                                    SqlValidatorImpl#registerNamespace(usingScope, alias, selectNs, forceNullable)
                                        SqlValidatorImpl#Map namespaces.put(ns.getNode(), ns)   注册sqlNode对应的SqlValidatorNameSpace
                                    SqlValidatorImpl#Map scopes.put(select, selectScope)   注册sqlNode对应的SqlValidatorScope
                                    SqlValidatorImpl#registerFrom   轮询注册from
                                    SqlValidatorImpl#registerOperandSubQueries
                                    SqlValidatorImpl#registerSubQueries
                                SqlValidatorImpl#outermostNode.validate((SqlSelect)this, (CatalogScope)scope) 
                                    SqlValidatorImpl#validateQuery
                                        SqlValidatorImpl#getNamespace(node, scope)  通过namespaces.get(key)得到SelectNamespace,对象值如下:
                                            SqlValidatorImpl#validateNamespace((SelectNamespace)ns, targetRowType)
                                                SqlValidatorImpl#namespal.validate
                                                    SelectNamesapce#this.validateImple(targetRowType)
                                                        SelectNamesapce#this.validator.validateSelect((SqlSelect)this.select, targetRowType)
                                                            SqlValidatorImpl#validateSelect((SqlSelect) select, (RelDateType) targetRowType)  ⭐️开始验证select,此时targetRowType为UNKNOWN,会递归验证from(join(left,right)),where,group,having,window,selectList,orderList
                                                                SqlValidatorImpl#SelectNamespace ns = getNamespace(select).unwrap(SelectNamespace.class) 获取nameSpace
                                                                SqlValidatorImpl#SelectScope fromScope = (SelectScope) getFromScope(select)  获取scope
                                                                SqlValidatorImpl#validateFrom
                                                                    SqlValidatorImpl#validateJoin
                                                                        SqlValidatorImpl#validateFrom(left, unknownType, joinScope)     
                                                                            SqlValidatorImpl#validateFrom(((SqlCall) node).operand(0), targetRowType, scope)  
                                                                                SqlValidatorImpl#validateQuery
                                                                                    SqlValidatorImpl#SqlValidatorNamespace ns = getNamespace(node, scope)
                                                                                        SqlValidatorImpl#validateNamespace
                                                                                            AbstractNameSpace#validate
                                                                                                SelectNamespace#validateImpl 返回RelDataType:  RecordType(INTEGER id, INTEGER seller, VARCHAr(2147483647) CHARACTER SET "UTF-16LE" name, INTEGER price) NOT NULL
                                                                                                    SqlValidatorImpl#validateSelect((SqlSelect) select, (RelDateType) targetRowType) 
                                                                                        SqlValidatorImpl#validateModality    只针对topNode
                                                                                        SqlValidatorImpl#validateAccess
                                                                                        SqlValidatorImpl#validateSnapshot
                                                                        SqlValidatorImpl#validateFrom(right, unknownType, joinScope)
                                                                SqlValidatorImpl#validateWhereClause
                                                                SqlValidatorImpl#validateGroupClause
                                                                SqlValidatorImpl#validateHavingClause
                                                                SqlValidatorImpl#validateWindowClause
                                                                SqlValidatorImpl#validateSelectList
                                                                SqlValidatorImpl#validateOrderList
                            SqlValidatorImpl#getValidatedNodeType

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701912.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存