spark数据倾斜

spark数据倾斜,第1张

spark数据倾斜

1、spark数据倾斜
(1)什么是数据倾斜?
数据倾斜指的是,并行处理的数据集里某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。比如统计单词频数的程序中某个Key对应的数据量非常大的话,就会产生数据倾斜,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。 另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。

(2)数据倾斜的原因?
数据倾斜的原因是:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,好比按照key进行聚合或join等 *** 作。此时若是某个key对应的数据量特别大的话,就会发生数据倾斜。好比大部分key对应10条数据,可是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,而后1秒钟就运行完了;可是个别task可能分配到了100万数据,要运行一两个小时。所以,整个Spark作业的运行进度是由运行时间最长的那个task决定的。所以出现数据倾斜的时候,Spark做业看起来会运行得很是缓慢,甚至可能由于某个task处理的数据量过大致使内存溢出。存

(2)如何判断和定位发生数据倾斜?
Spark Web UI 上task的执行时间或分配的数据量,如果一般task执行时间只有几秒,而某些task执行时间是几分钟甚至更久,那这部分task对应的stage就出现了数据倾斜。
定位出现问题的位置:
1、在程序里面找找,哪些地方用了会产生shuffle的算子,典型的有distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等
2、查看log日志,报内存溢出等异常的那行代码,或者从spark web ui 上task的执行时间或分配的数据量来判断, 对于执行特别慢的task进行分析,根据task划分的stage定位哪段代码中的算子导致了数据倾斜

(3)解决数据倾斜的方法有哪些?
1)过滤掉少数数据倾斜的key:
如果发现导致数据倾斜的key是极少数,并且对计算本身影响不大,那么这种方案比较适用。
   实现原理:通过spark的sample算子,定位到数据倾斜的key,然后使用filter算子将其过滤即可
2)提高shuffle的并行度:
这是一种尝试性策略:就是提高增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
3)两阶段的聚合(局部聚合和全局聚合)
适用场景:对RDD执行reduceByKey等这类有聚合 *** 作的shuffle算子或者spark SQL使用group by语句进行分组聚合,比较适用。
4)将reduce join 转换为map join(大小表):
实现原理:有reduce join的过程一定有shuffle,有shuffle就可能出现数据的倾斜,所以将reduce join使用map join 代替。如果一个RDD是比较小的,那么可以使用广播变量的方式,将小RDD发送到各个worker的executor中,实现本地的连接。
5)采样倾斜的key并拆分join *** 作(大大表):
适用场景:在hive两张表进行join的时候,如果两张表的数据都很大,并且,一张表的数据很均匀,但是另一张表的数据有少量的key数据量过大,此时使用这个解决方案
  实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。
6)使用随机前缀和扩容RDD进行join(大量key的数据倾斜):

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5699314.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存