山东大学软件工程应用与实践——Spark(11)代码分析

山东大学软件工程应用与实践——Spark(11)代码分析,第1张

山东大学软件工程应用与实践——Spark(11)代码分析 2021SC@SDUSC 8.6.1 语法分析器Analyzer:

        QueryExecution中的analyzer(logical)语句实际调用了Analyzer的父类RuleExecutor的apply方法来应用自己的batches。Analyzer 的FixedPoint目前是固定的100,从其注释看出将来会用参数传递值。通过继承Analyzer并且覆盖extendedRules用于提供额外的Rule。


        Analyzer 的实现:

val fixedPoint = FixedPoint (100)

val extendedRules: Seq [Rule [LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
    Batch ("Multi InstanceRelations", Once,
        NewRelationInstances),
    Batch ("Resolution", fixedPoint,
        ResolveReferences : :
        ResolveRelations : :
        ResolveSortReferences : :
        NewRelationInstances : :
        ImplicitGenerate : :
        StarExpansion : :
        ResolveFunctions : :
        GlobalAggregates : :
        UnresolvedHavingClauseAttributes : :
        Tr imGroupingAliases : :
        typeCoercionRules ++
        extendedRules :_*),
    Batch("Check Analysis", Once, .
        CheckResolution,
        CheckAggregation),
    Batch ("AnalysisOperators", fixedPoint,
        EliminateAnalysisOperators)
)

        Analyzer中已经内置了很多Rule,包括: ResolveReferences 、ResolveRelations、 StarEx-pansion等。经过Analyzer 的加工,Unresolved LogicalPlan已经成为Resolved LogicalPlan。以ResolveRelations为例来大致了解Analyzer。ResolveRelations 用来把LogicalPlan中匹配UnresolvedRelation的部分,替换为字典表Catalog中注册的LogicalPlan,见代码:

        Analyzer中的ResolveRelations的apply方法:


object ResolveRelations extends Rule [LogicalPlan] {
    def apply(plan: LogicalPlan) : LogicalPlan = plan transform {
        case i @ InsertIntoTable (UnresolvedRelation (tableIdentifier, alias),_ ,_ ,_) =>
            i. copy(
                table = EliminateAnalysisOperators (catalog.lookupRelation(table-
                    Identifier, alias)) )
        case UnresolvedRelation (tableIdentifier, alias) =>
            catalog. lookupRelation (tableIdentifier, alias)
    }
}

8.6.2 优化器 Optimizer

        Optimizer与Analyzer- 样, 也是通过父类RuleExecutor的apply方法来应用自己的batches,Optimizer的默认实现是DefaultOptimizer,见代码清单8-25。DefaultOptimizer 也内置了很多的Rule,比如NullPropagation. ConstantFolding 等。经过Optimizer对Resolved LogicalPlan的优化,生成Optimized LogicalPlan。

        Optimizer的实现:


abstract class Optimizer extends RuleExecutor [LogicalPlan]

    object Defaultoptimizer extends Optimizer {
        val batches =
            Batch ("Combine Limits", FixedPoint (100),
                CombineLimits) : :
            Batch ("ConstantFolding", FixedPoint (100),
                NullPropagation,
                ConstantFolding,
                LikeSimplification,
                BooleanSimplification,
                SimplifyFilters,
                SimplifyCasts,
                Simpli fyCaseConversionexpressions,
                OptimizeIn) : :
            Batch("Decimal Optimizations", FixedPoint (100) ,
                DecimalAggregates) : :
            Batch ("Fiiter Pushdown", FixedPoint (100),
                UnionPushdown,
                CombineFilters,
                PushPredicateThroughProject,
                PushPredica teThroughJoin,
                ColumnPruning) :: Nil
}

           无论是Analyzer中内置的Rule,还是DefaultOptimizer内置的Rule,将Rule应用到LogicalPlan都是通过TreeNode里的transform 系列函数。以SimplifyFilters为例,它所做的优化包括:
1.如果过滤条件总是等于true, 则删除它,即此过滤条件不起作用。
2.如果过滤条件总是等于null或者false, 将输人替换为空的relation, 即将输入全部滤除。
从SimplifyFilters的实现不难看出,它正是将自身规则作为参数传递给transform函数的,见代码:

        Optimizer中SimplifyFilters的实现:


object SimplifyFilters extends Rule [LogicalPlan] l
    def apply(plan: LogicalPlan) : LogicalPlan = plan transform {
        case Filter (Literal (true,BooleanType), child) => child
        case Filter (Literal (nul1,_ ), child) => LocalRelation (child.output, data =     
             Seq.empty)
        case Filter (Literal (false,BooleanType), child) => LocalRelation (child. output,
            data = Seq. empty)
    }
}


 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654466.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存