经过了SqlParser. Analyzer. Optimizer 的处理,生成的逻辑执行计划无法被当做一般的Job来处理,为了能够将逻辑执行计划按照其他Job-样对待, 需要将逻辑执行计划转变为物理执行计划了。
物理执行计划SparkPlan
首先使用SparkPlanner,实际使用QueryPlanner的apply方法,见代码:
def apply(plan: LogicalPlan) : Iterator [PhysicalPlan] = { val iter = strategies.view.flatMap(_ (plan)).toIterator assert (iter.hasNext,s"No plan for Splan") iter }
SparkPlanner中strategies的定义见代码:
def strategies: Seq[Strategy] = extraStrategies ++ ( CommandStrategy(self) :: DataSourceStrategy :: TakeOrdered :: HashAggregation :: LeftSemiJoin :: HashJoin :: InMemoryScans :: ParquetOperations :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil)
每个Strategy都实现了apply 方法。这些Strategy中,最常用的要算BasicOperators了,其实现见代码SparkStrategies中BasicOperators的实现。可以看到它对最常用的SQL关键字都做了处理。每个处理的分支,都会先调用planLater方法,见代码QueryPlanner的planLater方法,planLater 方法给child 节点的LogicalPlan应用SparkPlanner。于是就形成了迭代处理的过程,最终实现将整棵LogicalPlan树使用SparkPlanner来完成转换。
代码SparkStrategies中BasicOperators的实现:
object Basicoperators extends Strategy { def numPartitions = self.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match{ case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct (partial = true, planLater (child))) :: Nil case logical.Sort (sortExprs, child) if sq1Context.externalSortEnabled => execution. ExternalSort (sortExprs,global = true, planLater (child)):: Nil case logical. Sort (sortExprs,child) => execution. Sort (sortExprs, global = true, planLater (child)):: Nil case logical. SortPartitions (sortExprs,child) => execution.Sort (sortExprs, global = false, planLater (child)) :: Nil case logical. Project (projectList, child) => execution. Project (projectList,planLater(child)) ;: Nil case logical.Filter (condition, child) => execution.Filter (condition, planLater(child)) :: Nil case logical .Aggregate (group, agg, child) => execut ion. Aggregate (partial = false, group, agg, planLater (child)):: Nil case logical .Sample (fraction, withReplacement, seed, child) => execution。Sample (fraction, withReplacement, seed, planLater (child)):: Nil case SparkLogicalPlan (alreadyPlanned) => alreadyPlanned :: Nil case logical.LocalRelation (output,data) => val nPartitions = if (data. isEmpty) 1 else numPartitions PhysicalRDD( output, RDDConversions .productToRowRdd(sparkContext.parallelize(data, nPartitions), StructType . fromAttributes (output))) :: Nil case logical.Limit (IntegerLiteral (limit),child) -> execution.Limit(limit, planLater (chi1d)) :: Nil case Unions (unionChildren) = execution .Union (unionChildren .map (planLater)) :: Nil case logical.Except(left, right) =) execution . Except (planLater (left),planLater(right)) :: Nil case logical. Intersect(left,right) => execution. Intersect (planLater(left),planLater (right)) :: Nil case logical . Generate (generator, join, outer,_ , child) => . execution . Generate (generator, join = join, outer = outer, planLater (child)):: Nil case 1ogical.NoRelation => execution.PhysicalRDD(Ni1, singleRowRdd) :: Nil case logical . Repartition (expressions, child) => . execution. Exchange (HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil case e @ evaluatePython (udf,child,_) => BatchPythonevaluation(udf,e. output, planLater (child)) :: Nil case LogicalRDD (output,rdd) => PhysicalRDD (output,rdd) :: Nil case_ => Nil } }
代码QueryPlanner的planLater方法:
protected def planLater (plan: LogicalPlan) = apply(plan) .next ()
BasicOperators方法实际就是将logical.XXX转换为execution.XXx,将LogicalRDD转换为PhysicalRDD。
prcparcForExccution在执行前做准备工作,它的原理与Analyzer和Opimizer一样,就不详细展开。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)