12 Spark on RDD 分区器

12 Spark on RDD 分区器,第1张

12 Spark on RDD 分区

RDD 分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

Hash 分区:对于给定的 key,计算其 hashCode,并除以分区个数取余。

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}

Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the 
default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found 
$partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
...
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) = 
CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, 
rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1 }
if (partition > rangeBounds.length) {
partition = rangeBounds.length
} }
if (ascending) {
partition
} else {
rangeBounds.length - partition
} }
override def equals(other: Any): Boolean = other match {
...
}
override def hashCode(): Int = {
...
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
...
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
...
} }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695222.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存