山东大学软件工程应用与实践——Spark(12)代码分析

山东大学软件工程应用与实践——Spark(12)代码分析,第1张

山东大学软件工程应用与实践——Spark(12)代码分析 2021SC@SDUSC 1.生成物理执行计划:

        经过了SqlParser. Analyzer. Optimizer 的处理,生成的逻辑执行计划无法被当做一般的Job来处理,为了能够将逻辑执行计划按照其他Job-样对待, 需要将逻辑执行计划转变为物理执行计划了。

物理执行计划SparkPlan

        首先使用SparkPlanner,实际使用QueryPlanner的apply方法,见代码:


def apply(plan: LogicalPlan) : Iterator [PhysicalPlan] = {
    val iter = strategies.view.flatMap(_ (plan)).toIterator
    assert (iter.hasNext,s"No plan for Splan")
    iter
}

SparkPlanner中strategies的定义见代码:


def strategies: Seq[Strategy] =
    extraStrategies ++ (
    CommandStrategy(self) ::
    DataSourceStrategy ::
    TakeOrdered :: 
    HashAggregation ::
    LeftSemiJoin ::
    HashJoin ::
    InMemoryScans ::
    ParquetOperations ::
    BasicOperators ::
    CartesianProduct ::
    BroadcastNestedLoopJoin :: Nil)

        每个Strategy都实现了apply 方法。这些Strategy中,最常用的要算BasicOperators了,其实现见代码SparkStrategies中BasicOperators的实现。可以看到它对最常用的SQL关键字都做了处理。每个处理的分支,都会先调用planLater方法,见代码QueryPlanner的planLater方法,planLater 方法给child 节点的LogicalPlan应用SparkPlanner。于是就形成了迭代处理的过程,最终实现将整棵LogicalPlan树使用SparkPlanner来完成转换。

代码SparkStrategies中BasicOperators的实现:


object Basicoperators extends Strategy {
    def numPartitions = self.numPartitions

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match{
        case logical.Distinct(child) =>
            execution.Distinct(partial = false,
                execution.Distinct (partial = true, planLater (child))) :: Nil

        case logical.Sort (sortExprs, child) if sq1Context.externalSortEnabled =>
            execution. ExternalSort (sortExprs,global = true, planLater (child)):: Nil
        case logical. Sort (sortExprs,child) =>
            execution. Sort (sortExprs, global = true, planLater (child)):: Nil

        case logical. SortPartitions (sortExprs,child) =>
            execution.Sort (sortExprs, global = false, planLater (child)) :: Nil
        case logical. Project (projectList, child) =>
            execution. Project (projectList,planLater(child)) ;: Nil
        case logical.Filter (condition, child) =>
            execution.Filter (condition, planLater(child)) :: Nil
        case logical .Aggregate (group, agg, child) =>
            execut ion. Aggregate (partial = false, group, agg, planLater (child)):: Nil
        case logical .Sample (fraction, withReplacement, seed, child) =>
            execution。Sample (fraction, withReplacement, seed, planLater (child)):: Nil
        case SparkLogicalPlan (alreadyPlanned) => alreadyPlanned :: Nil
        case logical.LocalRelation (output,data) =>
            val nPartitions = if (data. isEmpty) 1 else numPartitions
            PhysicalRDD(
                output,
                RDDConversions .productToRowRdd(sparkContext.parallelize(data,
                    nPartitions),
                    StructType . fromAttributes (output))) :: Nil
        case logical.Limit (IntegerLiteral (limit),child) ->
            execution.Limit(limit, planLater (chi1d)) :: Nil
        case Unions (unionChildren) =
            execution .Union (unionChildren .map (planLater)) :: Nil
        case logical.Except(left, right) =)
            execution . Except (planLater (left),planLater(right)) :: Nil
        case logical. Intersect(left,right) =>
            execution. Intersect (planLater(left),planLater (right)) :: Nil
        case logical . Generate (generator, join, outer,_ , child) => .
            execution . Generate (generator, join = join, outer = outer, planLater                
                (child)):: Nil
        case 1ogical.NoRelation =>
            execution.PhysicalRDD(Ni1, singleRowRdd) :: Nil
        case logical . Repartition (expressions, child) => .
            execution. Exchange (HashPartitioning(expressions, numPartitions),
                planLater(child)) :: Nil
        case e @ evaluatePython (udf,child,_) =>
            BatchPythonevaluation(udf,e. output, planLater (child)) :: Nil
        case LogicalRDD (output,rdd) => PhysicalRDD (output,rdd) :: Nil
        case_ => Nil
    }
}

代码QueryPlanner的planLater方法:


protected def planLater (plan: LogicalPlan) = apply(plan) .next ()

        BasicOperators方法实际就是将logical.XXX转换为execution.XXx,将LogicalRDD转换为PhysicalRDD。
        prcparcForExccution在执行前做准备工作,它的原理与Analyzer和Opimizer一样,就不详细展开。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5669794.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存