SparkShuffle解析

SparkShuffle解析,第1张

SparkShuffle解析

Spark两种核心Shuffle:

 

HashShuffle

SortShuffle

一、HashShuffle 1.未经优化的HashShuffle

 在shuffleWrite阶段,也就是上层,每个task都会根据key进行hash划分,从而让相同hash值的key进入同一个blockfile文件,以供shuffleRead阶段的task拉取(溢写到磁盘形成blockfile之前,会先写入内存缓冲区中,填满后才可以溢写)。而blockfile的个数取决于下一个stage有多少个task,下游stage有多少个task,那么上游stage的每个task就会创建多少个blockfile。例如上图中,上游stage有4个task,下游有是哪个task,那么就一共会产生12个blockfile。

2.经过优化的HashShuffle

 为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制。此时会出现shuffleFileGroup的概念,同一个Executor中的task会将数据写到同一个shuffleFileGroup

优化后的HashShuffle,主要在产生的blockfile的个数上。未经优化的blockFile个数是上游stage的task个数N乘以下游stage的task个数M。N*M个。经过优化的个数则是以executor个数X乘以下游stage的task个数M。X*M个。使得task产生的磁盘文件可以先进行一定程度上的合并,这样可以大大减少产生的磁盘文件个数。

二、SortShuffle 1.普通机制的SortShuffle

        SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

        在该模式下,数据会根据shuffle算子的类型,以不同的数据结构写入内存数据结构中。如果是reduceByKey这种算子,会用Map类型。如果是join这种算子,那就用Array类型。每写入一条数据,都会判断是否达到了溢写的阈值,如果达到了,将溢写到磁盘。再溢写之前,会根据Key对数据进行排序,每批次10000条数据写入到一个内存缓冲区,当内存缓冲区溢满之后再一起写到磁盘文件中,这样可以减少磁盘IO。

        在内存缓冲区溢写到磁盘的过程中, 会产生很多个溢写文件,最终会进行一次merge,合并成一个文件。因为一个task只产生这一个溢写文件,该文件包含了下游Stage所有task所需的数据,所以还需要生成一个索引文件,用来标记每个下游Task在该溢写文件中的的startOffset和EndOffset。

        在此种Shuffle过程中,产生溢写文件的个数根据上游Stage中的 task个数决定。假如上游Task有50个,Executor有10个,下游Stage的Task有100个,未经优化的HashShuffle需要产生 50 *100=5000个溢写文件,优化的HashShuffle需要产生10*100=1000个溢写文件。而此种shuffle只需要50个溢写文件。

2.Bypass机制的SortShuffle

 

bypass运行机制的触发条件如下:

①shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值

②不是聚合类的shuffle算子

启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序 *** 作,也就节省掉了这部分的性能开销

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5479350.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存