- 前言
- Spark Sql执行全过程
- Spark Sql 实际转换过程
- Spark Sql逻辑计划
- 执行sql样例
- Spark sql物理计划
Spark SQL的前身是shark,即“Hive on Spark”。Shark项目最初启动于2011年。当时HIve几乎算是唯一的SQL-on-Hadoop选择方案。Hive将SQL语句翻译为MapReduce,性能受限于MapReduce计算模型,始终无法满足各种交互式sql分析的需求,Shark建立在Hive代码的基础上,只修改了内存管理、物理计划、执行3个模块中的部分逻辑。Shark通过将Hive部分物理执行计划交换出来,最终简化HiveOL转换为Spark计算模型,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10~100倍的提升。Shark最大的特征是与Hive完全兼容。
随着Spark的不断发展,shark对hive的重度依赖体现在架构上的瓶颈越来越突出。一方面,Hive的语法解析和查询优化等模块本身针对的是MapReduce,限制了在Spark系统上的深度优化和维护;另一方面,过度依赖Hive制约了Spark的“One stack rule them all”既定方针,也制约了技术栈中各个组件的灵活集成。
在此背景下,SparkSql项目被提出来,SparkSql摒弃原有的shark架构方式,但汲取了shark的一些优点,内存列存储、Hive兼容性等 重新开发了sql各个模块的代码。由于摆脱了对hive的依赖,sparksql在数据兼容、性能优化、组件扩展方面都得到了极大的提升。
逻辑计划阶段会将用户所写的SQL语句转换成树型数据结构(逻辑算子树),SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。逻辑算子树的生成过程历经3个子阶段,分别对应未解析的逻辑算子树(Unresolved LogicalPlan,仅仅是数据结构,不包含任何数据信息等)、解析后的逻辑算子树(Analyzed LogicalPlan,节点中绑定的各种信息)和优化后的逻辑算子树(Optimized LogicalPlan,应用各种优化规则对一些低效的逻辑计划进行转换)。
物理计划阶段将上一步逻辑计划阶段生成的逻辑算子树进行进一步转换,生成物理算子树。物理算子树的节点直接生成RDD或对RDD进行transformation *** 作(每个物理计划节点都实现了对RDD进行转换的excute方法)。
同样的,物理计划阶段也包含3个子阶段:首先,根据逻辑算子树,生成物理算子树列表Iterator[PhysicalPlan](同样的逻辑算子树可能对应多个物理算子树);然后,从列表中按照一定的策略选取最优的物理算子树(SparkPlan);最后,对选取的物理算子树进行提交前的准备工作,例如,确保分区 *** 作正确、物理算子树节点重用、执行代码生成等,得到“准备后”的物理算子树(PreparedSparkPlan).
从Sql语句解析一直到提交之前,上述整个转换过程都在Spark集群的Driver端进行,不涉及分布式环境。SparkSession类的sql方法调用SessionState中的各种对象,包括上述不同阶段对应的SparkSqlParser类,Analyzer类、Optimizer类和SparkPlanner类等,最后封装成一个QueryExecution对象。
左上角的sql语句,生成的逻辑算子树中有Relation、Filter和Project节点,分别对应数据表、过滤逻辑(age>11)和列裁剪(select name)。下一步的物理算子树从逻辑算子树一对一映射得到,Relation逻辑节点转换成FileSourceScanExec执行节点,Filter逻辑节点转换为FilterExec执行节点,Project逻辑节点转换为ProjectExec执行节点。
生成的物理算子树的根节点是ProjectExec,每个物理节点中的execute函数都是执行调用接口,由根节点开始递归调用,从叶子节点开始执行。
1)由SparkSqlParser中的AstBuilder执行节点访问,将语法树的各种Context节点转换成对应的LogicalPlan节点,从而成为一棵未解析的逻辑算子树,此时的逻辑算子树是最初形态,不包含数据信息与列信息。
2)由Analyzer将一系列的规则作用在Unreserved LogicalPlan上,对树上的节点绑定各种数据信息,生成解析后的逻辑算子树。
3)由Spark Sql中的优化器(Optimizer)将一系列优化规则作用于上一步生成的逻辑算子树中,在确保结果正确的前提下改写其中的低效结构,生成优化后的逻辑算子树。
student.json
[ {"id": 1,"name": "Kate","age": 29}, {"id": 2,"name": "Andy","age": 30}, {"id": 3,"name": "Tony","age": 10} ]
spark sql获取sql各阶段逻辑计划代码
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{QueryExecution, SparkSqlParser} object LogicalPlanTest { val sqlTest1: String = "select name from student s1 where age > 11" val sqlTest2: String = "select distinct name from student" val sqlTest3: String = "select name from student group by name" val sqlTest4: String = "select name from (select * from student)" val sqlTest5: String = "select name from student" def main(args: Array[String]): Unit = { val session: SparkSession = SparkUtil.getSparkSession("LogicalPlanTest") session.read .option("multiLine", value = true).option("mode", "PERMISSIVE") .json("student.json").createOrReplaceTempView("student") val parser = new SparkSqlParser(session.sessionState.conf) val plan: LogicalPlan = parser.parsePlan(sqlTest2) val execution: QueryExecution = session.sessionState.executePlan(plan) println("----------Unresolved LogicalPlan----------") val logical: LogicalPlan = execution.logical println(logical.toString()) println("----------Analyzed LogicalPlan----------") val analyzed: LogicalPlan = execution.analyzed println(analyzed.toString()) println("----------Optimizer LogicalPlan----------") val optimized: LogicalPlan = execution.optimizedPlan println(optimized.toString()) } }
程序输出结果:
----------Unresolved LogicalPlan---------- 'Project ['name] +- 'Filter ('age > 11) +- 'SubqueryAlias `s1` +- 'UnresolvedRelation `student` ----------Analyzed LogicalPlan---------- Project [name#2] +- Filter (age#0L > cast(11 as bigint)) +- SubqueryAlias `s1` +- SubqueryAlias `student` +- Relation[age#0L,id#1L,name#2] json ----------Optimizer LogicalPlan---------- Project [name#2] +- Filter (isnotnull(age#0L) && (age#0L > 11)) +- Relation[age#0L,id#1L,name#2] json
可以看到各阶段逻辑计划可以和转换过程验证
Spark sql物理计划 1)。由SparkPlanner将各种物理计划策略(Strategy)作用于对应的LogicalPlan节点上,生成SparkPlan列表(一个LogicalPlan可能产生多个sparkplan)
2)。选取最佳的SparkPlan,在老版本中2.1版本中是直接采用next方法获取第一个
3)。提交前进行准备工作,进行一些分区排序方面的处理,确保sparkplan各节点能够正确执行,这一步通过prepareForExecution()方法调用若干规则(Rule)进行转换。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)