SparkSql优化策略

SparkSql优化策略,第1张

SparkSql优化策略

有了SparkCore为什么还要有SparkSql呢?

有两大原因:

  • 一是SparkCore只能用Api,这就把很多SqlBoy拒之门外,Spark就无法发扬光大了;
  • 二是使用Api时用户编写的函数作为一个个闭包被序列化后分发到Executor执行,Spark无法对用户自定义的代码进行优化;

基于以上原因,SparkSql横空出世,并提供强大的、一篮子的优化方案,以便使用户专注于业务需求的实现,把性能优化交给spark框架。

SparkSql提供如下优化措施:

一,Catalyst 优化器

Catalyst 主要负责三个工作:

  • 一是根据Sql生成语法树(执行计划);
  • 二是执行计划的逻辑优化;
  • 三是执行计划的物理优化

以如下Sql为例,来看看Catalyst 是如何工作的:

select
	student_id,
	count(1)
from
	(
	select
		student_id,
		age
	from
		score
	where
		age > 10
)tmp
left join score 
on
	score.student_id = tmp.student_id
where score > 60
group by
	tmp.student_id
首先,生成逻辑语法树

逻辑语法树也可以称之为执行计划,如下:


这是Catalyst 解析出来的语法树,也可以认为是执行计划,从上往下并行执行。

按照这个计划,其实就可以去执行了,但是Catalyst 还会对其进行优化。

其次,执行计划的逻辑优化

从图上可以看出,有几个点是可以优化的:

  • 一是对score > 60的过滤时机可以提前,这样的话,score < 60的数据就不会加载到内存中。
  • 二是student和score表中可能有大量字段,但sql中只用到student_id、score、age这几个字段,其他字段无需加载到内存中。

以上分别对应着Catalyst 的两个优化手段:谓词下推、列裁剪。

像谓词下推、列剪枝这样的特性,都被称为启发式的规则或策略。而 Catalyst 优化器的核心职责之一,就是在逻辑优化阶段,基于启发式的规则和策略调整、优化执行计划。经过逻辑阶段的优化之后,执行计划如下:

逻辑优化是基于规则的优化,是静态的,Spark并没有到此为止,接下来会对执行计划进行动态的调整优化,动态调整的依据是运行中的数据统计。

第三,执行计划的物理优化

在执行过程中,Spark会根据数据集的大小进行计算策略的调整,以join为例,会根据数据集的大小选择Join的方式、数据分发的方式,比如有一个数据集比较小,会选择Broadcast方式将数据分发出去,节省网络分发的时间,提高性能。

二,Tungsten

继Catalyst 的优化之后,Tungsten 又出场了,其主要在数据结构和执行代码方面进行优化,主要的目的是为了更高效率的利用内存和CPU,如将空间利用率的java对象变为UnsafeRow;为了减少昂贵的方法调用,将一个Stage多个算子整合为一个函数;

1,Unsafe Row。

对于每条数据记录,Spark SQL默认采用Row 对象来进行封装和存储。Java对象是一种空间利用率不高的存储,比如与数据本身无关的对象头信息,为了补足长度的对齐部分,会产生占用相当可观的额外空间。

针对这个问题,Tungsten 设计并实现了一种叫做 Unsafe Row 的二进制数据结构。Unsafe Row是一种二进制数据结构,以非常紧凑的结构存储数据,如下所示:


Unsafe Row避免了大量的额外信息的存储,极大的提高了空间利用率,对于Spark这种重度内存依赖型计算引擎,有非常大的性能提升作用。

通常我们在写代码的过程中,并不经常直接使用Unsafe Row,Spark计算产生的中间结果和输出会使用到。

参考:Spark避坑指南----UnsafeRow对象的持久化

2,全阶段代码生成(WSCG,Whole Stage Code Generation)

Tungsten 让Spark越来越快

对于同一个Stage的多个算子,本质上是多个函数的链式调用,伴随着很多基本类型的装箱 *** 作,Tungsten对这些代码进行分析,将多个函数融合为一个函数,将多次输入输出变为一次输入输出,减少了函数调用和参数封装。

三,AQE(Adaptive Query Execution)

AQE 的全称是 Adaptive Query Execution,自适应查询执行。

AQE主要是针对Shuffle进行的优化,包含了 3 个动态优化特性:

  • Join 策略调整
  • 自动分区合并
  • 自动倾斜处理
1,开启

AQE 机制默认是未开启的,要想充分利用上述的 3 个特性,通过如下配置开启:

 spark.sql.adaptive.enabled=true
2, Join 策略调整

在运行的过程中,AQE会动态跟踪数据的变化,如A/B两个表Join,如果这两个表都是大表,在生成执行计划时,只能选择Shuffle Join,但后续对B进行过滤或者聚合后,数据量大幅减少,AQE将会动态的将Shuffle Join调整为Broadcast Join。

这里看的出AQE的Join调整策略依赖于Shuffle的中间文件,因为其需要根据中间文件的大小去决策是否调整Join方式。

3,自动分区合并

在Shuffle Write结束后,可能会产生很多数据量非常小的分区,并行度高但分区小会导致大量的调度,反而不利于作业的执行。

AQE会对小分区进行合并,多个小分区合并为一个大分区,减少Reduce阶段的并行度。

涉及到自动分区合并的有两个参数:

  • spark.sql.adaptive.advisoryPartitionSizeInBytes 默认64MB
  • spark.sql.adaptive.coalescePartitions.minPartitionNum,最小分区数,默认spark集群的默认并行度

第一个参数设置分区的最小尺寸,AQE会根据这个参数确定是否需要合并。
第二个参数确定最小分区数,合并后的分区数不能低于该配置。

AQE合并小分区的逻辑是,按分区ID逐个判断分区大小,如果分区小于最小分区尺寸,就将其与下一个分区合并;如果比最小分区大,就不合并。

4,自动倾斜处理

自动处理spark数据倾斜

数据倾斜是大数据处理过程中不可避免的问题,AQE可以自动识别出各个分区的数据倾斜,并对大分区进行拆分,实现自动倾斜处理。

Spark 自动倾斜处理的思路是根据配置识别出数据倾斜的分区,针对数据倾斜的分区单独处理,对于ShuffleWrite产生的多个要分发到Reduce同一个分区的数据,切分为多个下游分区,由下游多个Reduce任务处理。

如下图所示(图来自参考文章):

上图表示的是一个join,左右两边表示两个不同的表的Shuffle Read,中间是Reduce Shuffle Write,从上游拉取属于本分区的数据。

左边图示中的Partition0出现了数据倾斜,如果不进行处理,所有的Partition0都会汇聚到一个Reduce任务,对整个作业的效率产生负面影响。

ADE会将Partition0进行切分,Reduce端启用两个Task拉取Partition0的数据,每个任务单独拉取右表的Partition0数据,也就是说,对于右表的Partition0来说,就好像是被Broadcast到了下游的Reduce任务。

涉及到的配置:

  • spark.sql.adaptive.skewedJoin.enabled 设置为 true 即可自动处理 Join 时数据倾斜

  • spark.sql.adaptive.skewedPartitionMaxSplits 控制处理一个倾斜 Partition 的 Task 个数上限,默认值为 5

  • spark.sql.adaptive.skewedPartitionRowCountThreshold 设置了一个 Partition 被视为倾斜 Partition 的行数下限,也即行数低于该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即一千万

  • spark.sql.adaptive.skewedPartitionSizeThreshold 设置了一个 Partition 被视为倾斜 Partition 的大小下限,也即大小小于该值的 Partition 不会被视作倾斜 Partition。其默认值为 64 * 1024 * 1024 也即 64MB

  • spark.sql.adaptive.skewedPartitionFactor 该参数设置了倾斜因子。如果一个 Partition 的大小大于

  • spark.sql.adaptive.skewedPartitionSizeThreshold 的同时大于各 Partition 大小中位数与该因子的乘积,

  • spark.sql.adaptive.skewedPartitionRowCountThreshold 行数大于的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition

AQE判断倾斜的标准是:

  • 1,找到所有倾斜分区尺寸的中位数
  • 2,找到所有倾斜分区行数的中位数
  • 3,如果一个分区的行数大于 spark.sql.adaptive.skewedPartitionRowCountThreshold,同时,还要大于当前分区行数与spark.sql.adaptive.skewedPartitionFactor的乘积

举例说明:

一个分区的行数2000万
spark.sql.adaptive.skewedPartitionRowCountThreshold 配置为1000万
分区中位数是900万
倾斜因子是2,2000万> 1000万,同时2000万> 900万 * 2 = 1800W,该分区是倾斜分区
如果倾斜因子是3,则2000万 < 900万 * 3,当前分区不是倾斜分区

  • 4,如果一个分区的尺寸大于 spark.sql.adaptive.skewedPartitionSizeThreshold ,同时,还要大于当前分区尺寸与spark.sql.adaptive.skewedPartitionFactor的乘积。

举例说明:

一个分区大小是2G
spark.sql.adaptive.skewedPartitionRowCountThreshold 配置为1G
分区中位数是900M
倾斜因子是2,2G> 1G,同时2G> 900M * 2 ,该分区是倾斜分区
如果倾斜因子是3,虽然2G> 1G,但是2G < 900M * 3,当前分区不是倾斜分区

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5656375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存