Spark两种核心Shuffle:
HashShuffle
SortShuffle
一、HashShuffle 1.未经优化的HashShuffle在shuffleWrite阶段,也就是上层,每个task都会根据key进行hash划分,从而让相同hash值的key进入同一个blockfile文件,以供shuffleRead阶段的task拉取(溢写到磁盘形成blockfile之前,会先写入内存缓冲区中,填满后才可以溢写)。而blockfile的个数取决于下一个stage有多少个task,下游stage有多少个task,那么上游stage的每个task就会创建多少个blockfile。例如上图中,上游stage有4个task,下游有是哪个task,那么就一共会产生12个blockfile。
2.经过优化的HashShuffle为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制。此时会出现shuffleFileGroup的概念,同一个Executor中的task会将数据写到同一个shuffleFileGroup
优化后的HashShuffle,主要在产生的blockfile的个数上。未经优化的blockFile个数是上游stage的task个数N乘以下游stage的task个数M。N*M个。经过优化的个数则是以executor个数X乘以下游stage的task个数M。X*M个。使得task产生的磁盘文件可以先进行一定程度上的合并,这样可以大大减少产生的磁盘文件个数。
二、SortShuffle 1.普通机制的SortShuffleSortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
在该模式下,数据会根据shuffle算子的类型,以不同的数据结构写入内存数据结构中。如果是reduceByKey这种算子,会用Map类型。如果是join这种算子,那就用Array类型。每写入一条数据,都会判断是否达到了溢写的阈值,如果达到了,将溢写到磁盘。再溢写之前,会根据Key对数据进行排序,每批次10000条数据写入到一个内存缓冲区,当内存缓冲区溢满之后再一起写到磁盘文件中,这样可以减少磁盘IO。
在内存缓冲区溢写到磁盘的过程中, 会产生很多个溢写文件,最终会进行一次merge,合并成一个文件。因为一个task只产生这一个溢写文件,该文件包含了下游Stage所有task所需的数据,所以还需要生成一个索引文件,用来标记每个下游Task在该溢写文件中的的startOffset和EndOffset。
在此种Shuffle过程中,产生溢写文件的个数根据上游Stage中的 task个数决定。假如上游Task有50个,Executor有10个,下游Stage的Task有100个,未经优化的HashShuffle需要产生 50 *100=5000个溢写文件,优化的HashShuffle需要产生10*100=1000个溢写文件。而此种shuffle只需要50个溢写文件。
2.Bypass机制的SortShuffle
bypass运行机制的触发条件如下:
①shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
②不是聚合类的shuffle算子
启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序 *** 作,也就节省掉了这部分的性能开销
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)