5.2 双Value类型
5.2.1 union
-
作用:对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
#(1)创建第一个 RDD scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at:24 #(2)创建第二个 RDD scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24 #(3)计算两个 RDD 的并集 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at :28 #(4)打印并集结果 scala> rdd3.collect() res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
5.2.2 subtract
-
作用:计算差的一种函数,去除两个 RDD 中相同的元素,不同的 RDD 将保留下来
-
需求:创建两个 RDD,求第一个 RDD 与第二个 RDD 的差集
#(1)创建第一个 RDD scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at:24 #(2)创建第二个 RDD scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at :24 #(3)计算第一个 RDD 与第二个 RDD 的差集并打印 scala> rdd.subtract(rdd1).collect() res27: Array[Int] = Array(8, 6, 7)
5.2.3 intersection
-
作用:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
-
需求:创建两个 RDD,求两个 RDD 的交集
#(1)创建第一个 RDD scala> val rdd1 = sc.parallelize(1 to 7) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at:24 #(2)创建第二个 RDD scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at :24 #(3)计算两个 RDD 的交集 scala> val rdd3 = rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at :28 #(4)打印计算结果 scala> rdd3.collect() res19: Array[Int] = Array(5, 6, 7)
5.2.4 sample
sample 根据要求从全量数据中随机抽样
-
三个参数:withReplacement: true为可放回的随机抽样 false为不可放回的随机抽样 fraction: 设置比例 seed: 种子数
-
需求:随机抽取网站日志时间统计
package com.zch.spark.core.exercise import org.apache.spark.{SparkConf, SparkContext} object Exercise_SparkCoreDemo07 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("demo7") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\Users\Administrator\Desktop\test") rdd.sample(false,0.5,1) .map(line => { val str = line.split(" ")(3).substring(1, 12) (str,1) }) .reduceByKey(_+_) .foreach(println) //(09/Nov/2021,216) //(08/Nov/2021,23458) //(31/Oct/2021,4775) //(02/Nov/2021,6114) //(01/Nov/2021,7408) //(03/Nov/2021,6659) //(04/Nov/2021,6723) //(05/Nov/2021,7999) //(06/Nov/2021,4284) //(07/Nov/2021,4155) } }
5.2.5 combineByKey
combineByKey :
可以改变元素聚合的value的类型
方法的输入元素类型如果和输出元素类型一致的话,相当于reduceByKey
val rdd2 = rdd1 combineByKey[Long]( (v: Int) => v.toLong, (c: Long, v: Int) => c + v, (c1: Long, c2: Long) => c1 + c2 ) rdd2.foreach(println)
// 相当于reduceByKey rdd1.combineByKey[Int]( (v: Int) => v, (c: Int, v: Int) => c + v, (c1: Int, c2: Int) => c1 + c2 ) .foreach(println)
5.2.6 sortBy 和 sortByKey
sortBy底层还是调用sortByKey方法,所以两者的运行效率差不多
不同的是,sortBy可以根据原集合中不存在的key值进行排序
,sortByKey只能根据集合中存在的key值进行排序
package com.zch.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkCoreDemo09_transformations { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[1]") .setAppName("demo9") val sc = new SparkContext(conf) val rdd: RDD[(String, Int, Double, Int)] = sc.makeRDD(List( ("amos", 18, 10.25, 178), ("tom", 28, 2.5, 50), ("jerry", 16, 5000.0, 20) )) // 排序 // rdd.sortBy() // def sortBy[K]( // f: (T) => K, // ascending: Boolean = true, // numPartitions: Int = this.partitions.length) rdd.sortBy(t => t._2 * t._3) .foreach(println) // rdd.sortByKey() rdd.map(t => { (t._3, t) }) .sortByKey(numPartitions = 1) .map(_._2) // .foreach(println) } }
5.2.7 join
RDD1.join(RDD2):保存两集合都有的部分
RDD1.leftOuterJoin(RDD2):以左集合为准,RDD1集合中有的值,RDD2中没有,没有的会生成none,有的生成some
RDD1.rightOuterJoin(RDD2):与左连接同理
RDD1.fullOuterJoin(RDD2):为满链接,两集合中的元素都会展示,有的some,没有的none
// join // RDD[(K, V)].join //def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = //def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = //def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = // def fullOuterJoin[W](other: RDD[(K, W)]) : RDD[(K, (Option[V], Option[W]))] = val rdd1: RDD[(String, Int)] = sc.makeRDD(List("k1" -> 1, "k2" -> 2)) val rdd2: RDD[(String, Double)] = sc.makeRDD(List("k1" -> 1.1, "k3" -> 2.2)) rdd1.join(rdd2) .foreach(println)//(k1,(1,1.1)) rdd1.leftOuterJoin(rdd2) .foreach(println)//(k2,(2,None)) //(k1,(1,Some(1.1))) rdd1.fullOuterJoin(rdd2) .foreach(println)//(k2,(Some(2),None)) //(k3,(None,Some(2.2))) //(k1,(Some(1),Some(1.1)))
5.2.8 groupByKey 和 cogroup
groupByKey:根据key值,将相同的分为一组,如果key没有相同的,则不显示
cogroupBy:没有相同的key值,显示为空,完美解决没有相同key值就不显示的问题
val rdd3 = sc.makeRDD(List( "huawei" -> 8754, "huawei" -> 53465, "xiaomi" -> 678, "xiaomi" -> 235, "xiaomi" -> 2388, "vivo" -> 678, "vivo" -> 456, "vivo" -> 7489, "oppo" -> 234 )) val rdd4 = sc.makeRDD(List( "huawei" -> 875, "huawei" -> 53465, "huawei" -> 111, "xiaomi" -> 678, "xiaomi" -> 235, "oppo" -> 234 )) val rdd5 = sc.makeRDD(List( "htc" -> 875, "nokia" -> 53465 )) //def groupByKey(): RDD[(K, Iterable[V])] = val rdd31 = rdd3.groupByKey() .map(t => (t._1, t._2.toList)) .foreach(println)//(oppo,List(234)) //(vivo,List(678, 456, 7489)) //(huawei,List(8754, 53465)) //(xiaomi,List(678, 235, 2388))
5.2.9 cartesian 笛卡尔积
// def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = val rdd6 = sc.makeRDD(List("油泼面", "饺子", "火锅", "回锅肉")) val rdd7 = sc.makeRDD(List("冰峰", "酸梅汤", "可乐")) rdd6.cartesian(rdd7) .foreach(println) //(油泼面,冰峰) //(油泼面,酸梅汤) //(饺子,冰峰) //(油泼面,可乐) //(饺子,酸梅汤) //(饺子,可乐) //(火锅,冰峰) //(回锅肉,冰峰) //(火锅,酸梅汤) //(火锅,可乐) //(回锅肉,酸梅汤) //(回锅肉,可乐)
5.2.10 pipe
pipe可以接收一个系统命令(shell命令 shell脚本 windows dos命令) 将原RDD中每一个元素 传入命令或者脚本作为参数 并将脚本执行的输出结果收集到新的RDD并返回
#!/bin/bash echo "开始执行任务" while read X do echo "接收到$X" done echo "执行结束"
val rdd = sc.makeRDD(1 to 10) rdd.pipe("/root/test.sh").collect
结果为:
5.2.11 mapValues
针对于(K,V)形式的类型只对V进行 *** 作
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at:24 scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
5.2.12 subtract
计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at:24 scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at :24 scala> rdd.subtract(rdd1).collect() res27: Array[Int] = Array(8, 6, 7)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)