Spark 数据倾斜及其解决方案

Spark 数据倾斜及其解决方案,第1张

本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。

一、什么是数据倾斜

对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。

对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟,当机器数量增加到3台时,理想的耗时为120 / 3 = 40分钟。但是,想做到分布式情况下每台机器执行时间是单机时的1 / N,就必须保证每台机器的任务量相等。不幸的是,很多时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理 80% 的任务,另外两台机器各处理 10% 的任务。

『不患多而患不均』,这是分布式环境下最大的问题。意味着计算能力不是线性扩展的,而是存在短板效应: 一个 Stage 所耗费的时间,是由最慢的那个 Task 决定。

由于同一个 Stage 内的所有 task 执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同 task 之间耗时的差异主要由该 task 所处理的数据量决定。所以,要想发挥分布式系统并行计算的优势,就必须解决数据倾斜问题。

二、数据倾斜的危害

当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。

另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。

三、数据倾斜的现象

当发现如下现象时,十有八九是发生数据倾斜了:

四、数据倾斜的原因

在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等 *** 作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。比如大部分 key 对应10条数据,但是个别 key 却对应了100万条数据,那么大部分 task 可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别 task 可能分配到了100万数据,要运行一两个小时。

因此出现数据倾斜的时候,Spark 作业看起来会运行得非常缓慢,甚至可能因为某个 task 处理的数据量过大导致内存溢出。

五、问题发现与定位

1、通过 Spark Web UI

通过 Spark Web UI 来查看当前运行的 stage 各个 task 分配的数据量(Shuffle Read Size/Records),从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。

知道数据倾斜发生在哪一个 stage 之后,接着我们就需要根据 stage 划分原理,推算出来发生倾斜的那个 stage 对应代码中的哪一部分,这部分代码中肯定会有一个 shuffle 类算子。可以通过 countByKey 查看各个 key 的分布。

2、通过 key 统计

也可以通过抽样统计 key 的出现次数验证。

由于数据量巨大,可以采用抽样的方式,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个:

如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。

六、如何缓解数据倾斜

基本思路

思路1. 过滤异常数据

如果导致数据倾斜的 key 是异常数据,那么简单的过滤掉就可以了。

首先要对 key 进行分析,判断是哪些 key 造成数据倾斜。具体方法上面已经介绍过了,这里不赘述。

然后对这些 key 对应的记录进行分析:

解决方案

对于第 1,2 种情况,直接对数据进行过滤即可。

第3种情况则需要特殊的处理,具体我们下面详细介绍。

思路2. 提高 shuffle 并行度

Spark 在做 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。

如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

(1) *** 作流程

RDD *** 作 可在需要 Shuffle 的 *** 作算子上直接设置并行度或者使用 spark.default.parallelism 设置。如果是 Spark SQL,还可通过 SET spark.sql.shuffle.partitions=[num_tasks] 设置并行度。默认参数由不同的 Cluster Manager 控制。

dataFrame 和 sparkSql 可以设置 spark.sql.shuffle.partitions=[num_tasks] 参数控制 shuffle 的并发度,默认为200。

(2)适用场景

大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。

(3)解决方案

调整并行度。一般是增大并行度,但有时如减小并行度也可达到效果。

(4)优势

实现简单,只需要参数调优。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。

(5)劣势

适用场景少,只是让每个 task 执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些 key 的大小非常大,即使一个 task 单独执行它,也会受到数据倾斜的困扰。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

思路3. 自定义 Partitioner

(1)原理

使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。

例如,我们在 groupByKey 算子上,使用自定义的 Partitioner:

(2)适用场景

大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。

(3)解决方案

使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽量将所有不同的 Key 均匀分配到不同的 Task 中。

(4)优势

不影响原有的并行度设计。如果改变并行度,后续 Stage 的并行度也会默认改变,可能会影响后续 Stage。

(5)劣势

适用场景有限,只能将不同 Key 分散开,对于同一 Key 对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的 Partitioner,不够灵活。

思路4. Reduce 端 Join 转化为 Map 端 Join

通过 Spark 的 Broadcast 机制,将 Reduce 端 Join 转化为 Map 端 Join,这意味着 Spark 现在不需要跨节点做 shuffle 而是直接通过本地文件进行 join,从而完全消除 Shuffle 带来的数据倾斜。

其中 A 是比较小的 dataframe 并且能够整个存放在 executor 内存中。

(1)适用场景

参与Join的一边数据集足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中。

(2)解决方案

在 Java/Scala 代码中将小数据集数据拉取到 Driver,然后通过 Broadcast 方案将小数据集的数据广播到各 Executor。或者在使用 SQL 前,将 Broadcast 的阈值调整得足够大,从而使 Broadcast 生效。进而将 Reduce Join 替换为 Map Join。

(3)优势

避免了 Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。

(4)劣势

因为是先将小数据通过 Broadcase 发送到每个 executor 上,所以需要参与 Join 的一方数据集足够小,并且主要适用于 Join 的场景,不适合聚合的场景,适用条件有限。

思路5. 拆分 join 再 union

思路很简单,就是将一个 join 拆分成 倾斜数据集 Join 和 非倾斜数据集 Join,最后进行 union:

(1)适用场景

两张表都比较大,无法使用 Map 端 Join。其中一个 RDD 有少数几个 Key 的数据量过大,另外一个 RDD 的 Key 分布较为均匀。

(2)解决方案

将有数据倾斜的 RDD 中倾斜 Key 对应的数据集单独抽取出来加上随机前缀,另外一个 RDD 每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。

(3)优势

相对于 Map 则 Join,更能适应大数据集的 Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。

(4)劣势

如果倾斜 Key 非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜 Key 与非倾斜 Key 分开处理,需要扫描数据集两遍,增加了开销。

思路6. 大表 key 加盐,小表扩大 N 倍 jion

如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

其实就是上一个方法的特例或者简化。少了拆分,也就没有 union。

(1)适用场景

一个数据集存在的倾斜 Key 比较多,另外一个数据集数据分布比较均匀。

(2)优势

对大部分场景都适用,效果不错。

(3)劣势

需要将一个数据集整体扩大 N 倍,会增加资源消耗。

思路7. map 端先局部聚合

在 map 端加个 combiner 函数进行局部聚合。加上 combiner 相当于提前进行 reduce ,就会把一个 mapper 中的相同 key 进行聚合,减少 shuffle 过程中数据量 以及 reduce 端的计算量。这种方法可以有效的缓解数据倾斜问题,但是如果导致数据倾斜的 key 大量分布在不同的 mapper 的时候,这种方法就不是很有效了。

思路8. 加盐局部聚合 + 去盐全局聚合

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个 key 都打上一个 1~n 的随机数,比如 3 以内的随机数,此时原先一样的 key 就变成不一样的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成 (1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行 reduceByKey 等聚合 *** 作,进行局部聚合,那么局部聚合结果,就会变成了 (1_hello, 2) (2_hello, 2) (3_hello, 1)。然后将各个 key 的前缀给去掉,就会变成 (hello, 2) (hello, 2) (hello, 1),再次进行全局聚合 *** 作,就可以得到最终结果了,比如 (hello, 5)。

不过进行两次 mapreduce,性能稍微比一次的差些。

七、Hadoop 中的数据倾斜

Hadoop 中直接贴近用户使用的是 Mapreduce 程序和 Hive 程序,虽说 Hive 最后也是用 MR 来执行(至少目前 Hive 内存计算并不普及),但是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,因此这里稍作区分。

Hadoop 中的数据倾斜主要表现在 ruduce 阶段卡在99.99%,一直99.99%不能结束。

这里如果详细的看日志或者和监控界面的话会发现:

经验: Hive的数据倾斜,一般都发生在 Sql 中 Group 和 On 上,而且和数据逻辑绑定比较深。

优化方法

这里列出来一些方法和思路,具体的参数和用法在官网看就行了。

说明

八、参考文章

从速度的角度看,Spark从流行的MapReduce模型继承而来,可以更有效地支持多种类型的计算,如交互式查询和流处裂芹理。速度在大数据集的处理中非常重要,它可以决定用户可以交互式地处理数据,还是等几分钟甚至几小时。Spark为速度提供的一个重要特性是其可以在内存中运行计算,即使对基于磁盘的复杂应用,Spark依然比MapReduce更有效。

从通用性来说,Spark可以处理之前需要多个独立的分布式系统来处理的任务,这些任务包括批处理应用、交互式算法、交互式查询和数据流。通过用同一个引擎支持这些任务,Spark使得合并不同的处理类型变得简单,而合并罩源升 *** 作在生产数据分析中频繁使用。而且,Spark降低了维护不同工具的管理负担。

Spark被设计的高度易访问,用Python、Java、Scala和SQL提供简单的API,而且提供丰富的内建库。Spark也与其他大数据工具进行了集成。特别地,Spark可以运行在Hadoop的集群上,可以访问任何Hadoop的数据源,包括Cassandra。

Spark 核心组件

Spark核心组件包含Spark的基本功能,有任务调度组件、物老内存管理组件、容错恢复组件、与存储系统交互的组件等。Spark核心组件提供了定义d性分布式数据集(resilient distributed datasets,RDDs)的API,这组API是Spark主要的编程抽象。RDDs表示分布在多个不同机器节点上,可以被并行处理的数据集合。Spark核心组件提供许多API来创建和 *** 作这些集合。

Spark SQLSpark SQL是Spark用来处理结构化数据的包。它使得可以像Hive查询语言(Hive Query Language, HQL)一样通过SQL语句来查询数据,支持多种数据源,包括Hive表、Parquet和JSON。除了为Spark提供一个SQL接口外,Spark SQL允许开发人员将SQL查询和由RDDs通过Python、Java和Scala支持的数据编程 *** 作混合进一个单一的应用中,进而将SQL与复杂的分析结合。与计算密集型环境紧密集成使得Spark SQL不同于任何其他开源的数据仓库工具。Spark SQL在Spark 1.0版本中引入Spark。

Shark是一个较老的由加利福尼亚大学和伯克利大学开发的Spark上的SQL项目,通过修改Hive而运行在Spark上。现在已经被Spark SQL取代,以提供与Spark引擎和API更好的集成。

Spark流(Spark Streaming)Spark流作为Spark的一个组件,可以处理实时流数据。流数据的例子有生产环境的Web服务器生成的日志文件,用户向一个Web服务请求包含状态更新的消息。Spark流提供一个和Spark核心RDD API非常匹配的 *** 作数据流的API,使得编程人员可以更容易地了解项目,并且可以在 *** 作内存数据、磁盘数据、实时数据的应用之间快速切换。Spark流被设计为和Spark核心组件提供相同级别的容错性,吞吐量和可伸缩性。

MLlibSpark包含一个叫做MLlib的关于机器学习的库。MLlib提供多种类型的机器学习算法,包括分类、回归、聚类和协同过滤,并支持模型评估和数据导入功能。MLlib也提供一个低层的机器学习原语,包括一个通用的梯度下降优化算法。所有这些方法都可以应用到一个集群上。

GraphXGraphX是一个 *** 作图(如社交网络的好友图)和执行基于图的并行计算的库。与Spark流和Spark SQL类似,GraphX扩展了Spark RDD API,允许我们用和每个节点和边绑定的任意属性来创建一个有向图。GraphX也提供了各种各样的 *** 作图的 *** 作符,以及关于通用图算法的一个库。

集群管理器Cluster Managers在底层,Spark可以有效地从一个计算节点扩展到成百上千个节点。为了在最大化灵活性的同时达到这个目标,Spark可以运行在多个集群管理器上,包括Hadoop YARN,Apache Mesos和一个包含在Spark中的叫做独立调度器的简易的集群管理器。如果你在一个空的机器群上安装Spark,独立调度器提供一个简单的方式如果你已经有一个Hadoop YARN或Mesos集群,Spark支持你的应用允许在这些集群管理器上。第七章给出了不同的选择,以及如何选择正确的集群管理器。

谁使用Spark?用Spark做什么?

由于Spark是一个面向集群计算的通用框架,可用于许多不同的应用。使用者主要有两种:数据科学家和数据工程师。我们仔细地分析一下这两种人和他们使用Spark的方式。明显地,典型的使用案例是不同的,但我们可以将他们粗略地分为两类,数据科学和数据应用。

数据科学的任务数据科学,近几年出现的一门学科,专注于分析数据。尽管没有一个标准的定义,我们认为一个数据科学家的主要工作是分析和建模数据。数据科学家可能会SQL,统计学,预测模型(机器学习),用Python、MATLAB或R编程。数据科学家能将数据格式化,用于进一步的分析。

数据科学家为了回答一个问题或进行深入研究,会使用相关的技术分析数据。通常,他们的工作包含特殊的分析,所以他们使用交互式shell,以使得他们能在最短的时间内看到查询结果和代码片段。Spark的速度和简单的API接口很好地符合这个目标,它的内建库意味着很多算法可以随时使用。

Spark通过若干组件支持不同的数据科学任务。Spark shell使得用Python或Scala进行交互式数据分析变得简单。Spark SQL也有一个独立的SQL shell,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击它可以用SQL进行数据分析,也可以在Spark程序中或Spark shell中使用Spark SQL。MLlib库支持机器学习和数据分析。而且,支持调用外部的MATLAB或R语言编写的程序。Spark使得数据科学家可以用R或Pandas等工具处理包含大量数据的问题。

spark和hadoop的区别:诞生的先后顺序、计算不同、平台不同。

诞生的先后顺序,hadoop属于第一代开源大数据处理平台,而spark属于第二森盯代。属于下一代的spark肯定在综合评价上要优于第一代的hadoop。

计算不同spark和hadoop在分布式计算的底层思路上,其实宏昌是极为相似的,即mapreduce分布式运算模此绝和型:将运算分成两个阶段,阶段1-map,负责从上游拉取数据后各自运算,然后将运算结果shuffle给下游的reduce,reduce再各自对通过shuffle读取来的数据进行聚合运算spark和hadoop在分布式计算的具体实现上,又有区别;hadoop中的mapreduce运算框架,一个运算job,进行一次map-reduce的过程;而spark的一个job中,可以将多个map-reduce过程级联进行。

平台不同spark和hadoop区别是,spark是一个运算平台,而hadoop是一个复合平台(包含运算引擎,还包含分布式文件存储系统,还包含分布式运算的资源调度系统),所以,spark跟hadoop来比较的话,主要是比运算这一块大数据技术发展到目前这个阶段,hadoop主要是它的运算部分日渐式微,而spark目前如日中天,相关技术需求量大,offer好拿。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12397285.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-25
下一篇 2023-05-25

发表评论

登录后才能评论

评论列表(0条)

保存