- 说明
- 分享
- 接口说明
- map
- filter
- flatMap
- mapPartitions
- mapPartitionsWithIndex
- mapWith
- flatMapWith
- coalesce
- repartition
- randomSplit
- glom
- union并集
- distinct
- 总结
分享本文记录一部分Spark RDD接口Scala代码实现。
- 大数据博客列表
- 对RDD中的每个元素执行一个指定函数产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应,实例如下:
val a =sc.parallelize(1 to 9, 3) val b =a.map(x => x*2) a.collect //Array[Int]= Array(1, 2, 3, 4, 5, 6, 7, 8, 9) b.collect //Array[Int]= Array(2, 4, 6, 8, 10, 12, 14, 16, 18)filter
- 对RDD中的每个元素执行一个指定的函数来过滤产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
val rdd =sc.parallelize(List(1,2,3,4,5,6)) val filterRdd = rdd.filter(_> 5) filterRdd.collect() //返回所有大于5的数据的一个Array,值Array(6,8,10,12)flatMap
- 与map类似,区别是map处理后只能生成一个元素,flatmap处理后可生成多个元素构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)
val a = sc.parallelize(1 to 4, 2) val b =a.flatMap(x => 1 to x) b.collect // Array[Int]= Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)mapPartitions
-
map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:
def mapPartitions[U:ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean =false): RDD[U] -
f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的,举例如下:
val a =sc.parallelize(1 to 9, 3) def myfunc[T](iter:Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while(iter.hasNext) { val cur = iter.next res.::=(pre, cur) pre = cur } res.iterator } a.mapPartitions(myfunc).collect() //Array[(Int,Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
- 上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
defmapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
var rdd1 =sc.makeRDD(1 to 5,2) //rdd1有两个分区 var rdd2 =rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(x + "|" +i).iterator } } //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引 rdd2.collect() //Array[String] = Array(0|3, 1|12)mapWith
-
mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:
def mapWith[A:ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean =false)(f: (T, A) => U): RDD[U]- 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
- 第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。
-
举例:把partition index 乘以10加2,作为新的RDD的元素。
val x =sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3) x.mapWith(a =>a * 10)((b, a) => (b,a + 2)).collect() 结果: (1,2) (2,2) (3,2) (4,12) (5,12) (6,12) (7,22) (8,22) (9,22) (10,22)flatMapWith
-
flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:def flatMapWith[A:ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean= false)(f: (T, A) => Seq[U]): RDD[U]
-
举例:
val a =sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect() //res58: Array[Int]= Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)coalesce
-
该函数用于将RDD进行重分区,使用HashPartitioner,使用如下:defcoalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:Ordering[T] = null): RDD[T]
- 第一个参数为重分区的数目
- 第二个为是否进行shuffle,默认为false;
-
实例:
var data= sc.parallelize(1 to 12, 3) data.collect data.partitions.size var rdd1= data.coalesce(1) rdd1.partitions.size var rdd1= data.coalesce(4) rdd1.partitions.size //res2: Int = 1 如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则,分区数不便 var rdd1= data.coalesce(4,true) rdd1.partitions.size //res3: Int = 4repartition
defrepartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]该函数其实就是coalesce函数第二个参数为true的实现
var data= sc.parallelize(1 to 12, 3) data.collect data.partitions.size var rdd1= data. repartition(1) rdd1.partitions.size var rdd1= data. repartition(4) rdd1.partitions.size //res3: Int = 4randomSplit
defrandomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong):Array[RDD[T]] 该函数根据weights权重,将一个RDD切分成多个RDD,干函数有两个参数:
-
第一个参数:权重参数为一个Double数组
-
第二个参数:为random的种子,基本可忽略。
-
实例如下:
var rdd= sc.makeRDD(1 to 12,12) rdd:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21 rdd.collect //Array[Int] =Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) varsplitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2)) splitRDD:Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] atrandomSplit at :23, MapPartitionsRDD[18]at randomSplit at :23, MapPartitionsRDD[19]at randomSplit at :23, MapPartitionsRDD[20]at randomSplit at :23) //这里注意:randomSplit的结果是一个RDD数组 splitRDD.size //res8: Int = 4 //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD, //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到数据的几率就大一些。 //注意,权重的总和加起来为1,否则会不正常 splitRDD(0).collect //res10: Array[Int]= Array(1, 4) splitRDD(1).collect //res11: Array[Int]= Array(3) splitRDD(2).collect //res12: Array[Int]= Array(5, 9) splitRDD(3).collect //res13: Array[Int]= Array(2, 6, 7, 8, 10)glom
def glom():RDD[Array[T]]该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
var rdd= sc.makeRDD(1 to 10,3) rdd: org.apache.spark.rdd.RDD[Int]= ParallelCollectionRDD[38] at makeRDD at :21 rdd.partitions.size //res33: Int =3 该RDD有3个分区 rdd.glom().collect res35:Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组union并集
该函数用于将两个数据集合并为一个数据集
val rdd1 =sc.parallelize(List(5, 6, 4, 3)) val rdd2 =sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 =rdd1.union(rdd2) rdd3.collectdistinct
该函数将两个数据集合并的基础上去重生成一个新的数据集
val rdd1 =sc.parallelize(List(5, 6, 4, 3)) val rdd2 =sc.parallelize(List(1, 2, 3, 4)) //求并集 val rdd3 =rdd1.union(rdd2) //去重输出 rdd3.distinct.collect总结
不积跬步无以至千里,不要忽略任何渺小的问题和成长,每天积攒一点点,终有一天量变引起质变。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)