- 概述
- Transformation(转换算子)
- 1. map
- 2. flatMap
- 3. filter
- 4. mapPartitions
- 5. mapPartitionsWithIndex
- 6. sample
- 7. mapValues
- 8. union(并集)
- 9. substract(差集)
- 10. reduceByKey
- 11. groupByKey
- 12. combineByKey
- 13. foldByKey
- 14. aggregateByKey
- 15. join
- 16. sortBy
- 17. repartition
- Action(执行算子)
- 1. reduce
- 2. foreach
- 3. count、countByKey、countByValue
- 4. take、takeSample、first
- 5. max、min、mean、sum(数字运算)
- 代码汇总
- transformation 部分代码汇总
- action 部分代码汇总
概述
对于 Spark 处理的大量数据而言,会将数据切分后放入RDD作为Spark 的基本数据结构,开发者可以在 RDD 上进行丰富的 *** 作,之后 Spark 会根据 *** 作调度集群资源进行计算。总结起来,RDD 的 *** 作主要可以分为 Transformation 和 Action 两种。
官方文档
- (1)Transformation 转换 *** 作:返回一个新的RDD
- which create a new dataset from an existing one
- 所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发
- (2)Action动作 *** 作:返回值不是RDD(无返回值或返回其他的)
- which return a value to the driver program after running a computation on the datase
- 所有Action函数立即执行(Eager),比如count、first、collect、take等
此外注意RDD中函数细节:
- 第一点:RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数);
- 第二点:RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行。之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD *** 作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行
Transformation(转换算子)
在Spark中Transformation *** 作表示将一个RDD通过一系列 *** 作变为另一个RDD的过程,这个 *** 作可能是简单的加减 *** 作,也可能是某个函数或某一系列函数。值得注意的是Transformation *** 作并不会触发真正的计算,只会建立RDD间的关系图。
如下图所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从 V1、V2、U1、U2、U3、U4 采样出数据 V1、U1 和 U4,形成新的RDD。
1. map
源码:
def map[U](f : scala.Function1[T, U])(implicit evidence : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { }
- 表示将 RDD 经由某一函数 f 后,转变为另一个RDD。
只需要传入一个函数即可,如下代码,将原来的Seq集合中每个元素都乘以10,再返回结果,如下:
@Test def mapTest(): Unit = { // 1. 创建RDD val rdd1 = sc.parallelize(Seq(1, 2, 3)) // 2. 执行 map *** 作 val rdd2 = rdd1.map(item => item * 10) // 3. 得到结果 val result = rdd2.collect() //通过调用collect来返回一个数组,然后打印输出 result.foreach(item => println(item)) }
运行结果:
10 20 30 Process finished with exit code 0
2. flatMap
源码:
def flatMap[U](f : scala.Function1[T, scala.TraversableOnce[U]])(implicit evidence : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { }
- 表示将 RDD 经由某一函数 f 后,转变为一个新的 RDD,但是与 map 不同,RDD 中的每一个元素会被映射成新的 0 到多个元素(f 函数返回的是一个序列 Seq)。
代码演示:
@Test def flatMapTest() = { // 1. 创建RDD val rdd1 = sc.parallelize(Seq("Hello 吕布", "Hello 貂蝉", "Hello 铠")) // 2. 处理数据 val rdd2 = rdd1.flatMap(item => item.split(" ")) // 3. 查看结果 val result = rdd2.collect() result.foreach(item => println(item)) // 4. 关闭资源 sc.stop() }
运行结果:
Hello 吕布 Hello 貂蝉 Hello 铠 Process finished with exit code 0
3. filter
源码:
def filter(f : scala.Function1[T, scala.Boolean]) : org.apache.spark.rdd.RDD[T] = { }
- filter 可以过滤掉数据集中的一部分元素
- filter 中接受的函数,参数是每一个元素,如果这个函数返回true,当前元素就会被加入新数据集,如果返回false,当前元素会被过滤掉
代码演示:
@Test def filter() = { sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .filter(item => item % 2 == 0) //取偶数 .collect() .foreach(item => println(item)) }
运行结果:
2 4 6 8 10 Process finished with exit code 0
4. mapPartitions
源码:
def mapPartitions[U](f : scala.Function1[scala.Iterator[T], scala.Iterator[U]], preservesPartitioning : scala.Boolean = { })(implicit evidence : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { }
- mapPartitions 和 map算子一样,只不过map是针对每一条数据进行转换,mapPartitions针对一整个分区的数据进行转换
- map的func参数是单条数据,mapPartitions的func参数是一个集合(一个分区所有的数据)
- map的func返回值也是单条数据,mapPartitions的func返回值是一个集合
代码演示:
@Test def mapPartitions(): Unit = { // 1. 数据生成 // 2. 算子使用 // 3. 获取结果 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitions(iter => { iter.foreach(item => println(item)) iter }) .collect() }
运行结果:
1 4 2 5 3 6 Process finished with exit code 0
如果想给上述集合中的元素都乘以10该,如何 *** 作?
@Test def mapPartitions2(): Unit = { // 1. 数据生成 // 2. 算子使用 // 3. 获取结果 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitions(iter => { // 如果想给集合中的数字都乘10,该如何 *** 作? // 遍历 iter 其中每一条数据进行转换,转换完成之后,返回 iter val result = iter.map(item => item * 10) //注意这个的map算子并不是RDD中的,而是Scala中的 result }) .collect() .foreach(item => println(item)) }
运行结果:
10 20 30 40 50 60 Process finished with exit code 0
5. mapPartitionsWithIndex
源码:
def mapPartitionsWithIndex[U](f : scala.Function2[scala.Int, scala.Iterator[T], scala.Iterator[U]], preservesPartitioning : scala.Boolean = { })(implicit evidence : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { }
- mapPartitionsWithIndex 和 mapPartitions 的区别是 func 参数中多了一个参数,分区号
@Test def mapPartitionsWithIndex(): Unit = { sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitionsWithIndex( (index, iter) => { println("index: " + index) iter.foreach(item => println(item)) iter } ) .collect() }
运行结果:
index: 0 1 2 3 index: 1 4 5 6 Process finished with exit code 0
运行结果也有可能是这样:原因是RDD的并发性质
index: 1 index: 0 4 5 6 1 2 3 Process finished with exit code 0
6. sample
源码:
def sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long = { }) : org.apache.spark.rdd.RDD[T] = { }
- 采样,尽可能减少数据集的规律损失
- withReplacement 参数决定有放回或者无放回采样
- fraction 参数是采样比例
@Test def sample() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) val rdd2 = rdd1.sample(false, 0.6) //第一个参数为false代表无放回采样,0.6是采样比例 val result = rdd2.collect() result.foreach(item => println(item)) }
运行结果:
3 4 5 6 7 8 9 Process finished with exit code 0
7. mapValues
源码:
def mapValues[U](f : scala.Function1[V, U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] = { }
- mapValues 也是 map,只不过map作用于整条数据,mapValues作用于 Value
@Test def mapValues() = { sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) .mapValues(item => item * 10) .collect() .foreach(println(_)) }
运行结果:
(a,10) (b,20) (c,30) Process finished with exit code 0
8. union(并集)
@Test def union() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.union(rdd2) .collect() .foreach(println(_)) }
运行结果:
1 2 3 4 5 3 4 5 6 7 Process finished with exit code 0
9. substract(差集)
@Test def subtract() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.subtract(rdd2) //rdd1-rdd2 .collect() .foreach(println(_)) }
运行结果:
1 2 Process finished with exit code 0
10. reduceByKey
源码:
def reduceByKey(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { }
- 聚合 *** 作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定)
@Test def reduceByKey() = { // 1.创建RDD val rdd1 = sc.parallelize(Seq("Hello 吕布", "Hello 貂蝉", "Hello 铠")) // 2.处理数据 val rdd2 = rdd1.flatMap(item => item.split(" ")) .map(item => (item, 1)) .reduceByKey((curr, agg) => curr + agg) //注意agg是一个临时变量,或者局部结果,起始值为0 // 3.得到结果 val result = rdd2.collect() result.foreach(item => println(item)) // 4.关闭资源 sc.stop()
运行结果:
(铠,1) (貂蝉,1) (Hello,3) (吕布,1) Process finished with exit code 0
11. groupByKey
RDD中groupByKey和reduceByKey区别???
- reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。简而言之,分组聚合。
- groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。简而言之就是只分组,不聚合。
@Test def groupByKey() = { sc.parallelize(Seq(("a", 1), ("a", 1), ("c", 3))) .groupByKey() .collect() .foreach(println(_)) //只有一个参数打印输出可以简写 }
运行结果:
(a,CompactBuffer(1, 1)) (c,CompactBuffer(3)) Process finished with exit code 0
12. combineByKey
源码:
def combineByKey[C]( createCombiner : scala.Function1[V, C], mergevalue : scala.Function2[C, V, C], mergeCombiners : scala.Function2[C, C, C] ) : org.apache.spark.rdd.RDD[scala.Tuple2[K, C]] = { }
- CombineByKey 算子中接受三个参数:
- 转换数据的函数(初始函数,作用于第一条数据,用于开启整个计算),在分区上进行聚合,把所有分区的聚合结果聚合为最终结果
@Test def combineByKey() = { // 1.准备集合 val rdd: RDD[(String, Double)] = sc.parallelize(Seq( ("铠", 100.0), ("耀", 99.0), ("镜", 99.0), ("镜", 98.0), ("铠", 97.0) )) // 2.算子 *** 作 // 2.1 createCombiner 转换数据 // 2.2 mergevalue 分区上的聚合 // 2.3 mergeCombiners 把分区上的结果再次聚合,生成最终结果 val combineResult = rdd.combineByKey( createCombiner = (curr: Double) => (curr, 1), mergevalue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1), mergeCombiners = (curr: (Double, Int), agg:(Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2) ) val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2)) // 3. 输出数据 resultRDD.collect().foreach(println(_)) }
运行结果:
(铠,98.5) (耀,99.0) (镜,98.5) Process finished with exit code 0
13. foldByKey
源码:
def foldByKey(zeroValue : V)(func : scala.Function2[V, V, V]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { }
- foldByKey 和 reduceByKey 的区别是可以指定初始值
- foldByKey 和 Scala中的 foldLeft、foldRight 区别是,这个初始值作用于每一个数据
@Test def foldByKey() = { sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1))) .foldByKey(10)((curr, agg) => curr + agg) .collect() .foreach(println(_)) }
运行结果:
(a,22) (b,11) Process finished with exit code 0
14. aggregateByKey
源码:
def aggregateByKey[U](zeroValue : U)(seqOp : scala.Function2[U, V, U], combOp : scala.Function2[U, U, U])(implicit evidence : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[scala.Tuple2[K, U]] = { }
- aggregateByKey(zeroValue)(seqOp, combOp)
- zeroValue:指定初始值
- seqOp:作用于每个元素,根据初始值,进行计算
- combOp:将 seqOp 处理过的结果进行聚合
- aggregateByKey 比较适合针对每个数据要先处理,后聚合的场景
@Test def aggregateByKey() = { val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0))) rdd.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg) .collect() .foreach(println(_)) }
运行结果:
(手机,20.0) (电脑,16.0) Process finished with exit code 0
15. join
源码:
def join[W]( other : org.apache.spark.rdd.RDD[scala.Tuple2[K, W]] ) : org.apache.spark.rdd.RDD[scala.Tuple2[K, scala.Tuple2[V, W]]] = { }
@Test def join() = { val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1))) val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("b", 12))) rdd1.join(rdd2) .collect() .foreach(println(_)) }
运行结果:
(a,(1,10)) (a,(1,11)) (a,(2,10)) (a,(2,11)) (b,(1,12)) Process finished with exit code 0
16. sortBy
源码:
def sortBy[K](f : scala.Function1[T, K], ascending : scala.Boolean = { }, numPartitions : scala.Int = { })(implicit ord : scala.Ordering[K], ctag : scala.reflect.ClassTag[K]) : org.apache.spark.rdd.RDD[T] = { }
- sortBy 可以用于任何类型数据的RDD,sortByKey 只有 KV 类型数据的RDD中才有
- sortBy 可以按照任何部分顺序来排序,sortByKey 只能按照 Key 来排序
- sortByKey 写发简单,不用编写函数了
@Test def sort() = { val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 1, 8)) val rdd2 = sc.parallelize(Seq(("a", 1), ("b", 3), ("c", 2))) println("-----------------------------") rdd1.sortBy(item => item).collect().foreach(println(_)) println("-----------------------------") rdd2.sortBy(item => item._2).collect().foreach(println(_)) println("-----------------------------") rdd2.sortByKey().collect().foreach(println(_)) }
运行结果:
----------------------------- 1 1 2 4 5 8 ----------------------------- (a,1) (c,2) (b,3) ----------------------------- (a,1) (b,3) (c,2) Process finished with exit code 0
17. repartition
源码:
def repartition(numPartitions : scala.Int)(implicit ord : scala.Ordering[T] = { }) : org.apache.spark.rdd.RDD[T] = { }
- repartition 进行重分区的时候,默认是 shuffle 的
- coalesce 进行重分区的时候,默认是不 shuffle 的,coalesce 默认不能增大分区数
@Test def partitioning() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2) println(rdd.repartition(5).partitions.size) println(rdd.repartition(1).partitions.size) println(rdd.coalesce(5, shuffle = true ).partitions.size) } }
5 1 5 Process finished with exit code 0
Action(执行算子)
不同于Transformation *** 作,Action *** 作代表一次计算的结束,不再产生新的 RDD,将结果返回到Driver程序或者输出到外部。所以Transformation *** 作只是建立计算关系,而Action *** 作才是实际的执行者。每个Action *** 作都会调用SparkContext的runJob 方法向集群正式提交请求,所以每个Action *** 作对应一个Job。
1. reduce
源码:
def reduce(f : scala.Function2[T, T, T]) : T = { }
- 函数中传入的 curr参数,并不是 Value,而是一整条数据
- reduce 整体上的结果,只有一个
- 聚合的时候,往往需要聚合 中间临时变量
@Test def reduce() = { val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0))) val result: (String, Double) = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2)) println(result) // reduce的结果是一个元组 }
运行结果:
(总价,45.0) Process finished with exit code 0
2. foreach
源码:
def foreach(f : scala.Function1[T, scala.Unit]) : scala.Unit = { }
- RDD中自带的foreach算子,注意输出的结果顺序不一定按照原来Seq集合中的顺序,是因为RDD是并行计算,异步 *** 作。
@Test def foreach() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4)) rdd.foreach(item => println(item)) }
运行结果:
3 1 2 4 Process finished with exit code 0
3. count、countByKey、countByValue
- count 和 countByKey 的结果相距很远,每次调用 Action 都会生成一个 job,job 会运行获取结果,所以在俩个 job中间有大量的 Log,其实就是在启动job
- countByKey的运算结果是一个Map型数据:Map(a -> 2, b -> 1, c -> 1)
- 数据倾斜:如果要解决数据倾斜,是不是要先知道谁倾斜,通过countByKey可以查看Key对应的数量,从而解决倾斜问题
@Test def count() = { val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4))) println(rdd.count()) // 求出集合中数据的总数 println(rdd.countByKey()) // 得出 Key println(rdd.countByValue()) }
运行结果:
4 Map(a -> 2, b -> 1, c -> 1) Map((b,2) -> 1, (c,3) -> 1, (a,1) -> 1, (a,4) -> 1) Process finished with exit code 0
4. take、takeSample、first
- take() 和 takeSample() 都是获取数据,一个是直接获取,一个是采样获取(又放回、无放回)
- first:一般情况下,action 会从所有分区获取数据,相对来说速度比较慢,first 只是获取第一个元素所有只会处理第一个分区,所以速度很快,无需处理所有数据
@Test def take() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6)) rdd.take(3).foreach(item => println(item)) // 返回前N个数据 println(rdd.first()) // 返回第一个元素 rdd.takeSample(withReplacement = false, num = 3).foreach(item => println(item)) }
运行结果:
1 2 3 1 2 1 5 Process finished with exit code 0
5. max、min、mean、sum(数字运算)
- 没有中位数,缺陷!
@Test def numberic() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 10, 99, 120, 7)) println(rdd.max()) // 最大值 println(rdd.min()) // 最小值 println(rdd.mean()) // 均值 println(rdd.sum()) //求和 }
运行结果:
120 1 27.888888888888893 251.0 Process finished with exit code 0
代码汇总 transformation 部分代码汇总
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test class TransformationOp { val conf: SparkConf = new SparkConf().setAppName("transformation_op").setMaster("local[6]") val sc = new SparkContext(conf) @Test def mapPartitions(): Unit = { // 1. 数据生成 // 2. 算子使用 // 3. 获取结果 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitions(iter => { iter.foreach(item => println(item)) iter }) .collect() } @Test def mapPartitions2(): Unit = { // 1. 数据生成 // 2. 算子使用 // 3. 获取结果 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitions(iter => { // 如果想给集合中的数字都乘10,该如何 *** 作? // 遍历 iter 其中每一条数据进行转换,转换完成之后,返回 iter val result = iter.map(item => item * 10) //注意这个的map算子并不是RDD中的,而是Scala中的 result }) .collect() .foreach(item => println(item)) } @Test def mapPartitionsWithIndex(): Unit = { sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2) .mapPartitionsWithIndex( (index, iter) => { println("index: " + index) iter.foreach(item => println(item)) iter } ) .collect() } @Test def filter() = { sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .filter(item => item % 2 == 0) //取偶数 .collect() .foreach(item => println(item)) } @Test def sample() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) val rdd2 = rdd1.sample(false, 0.6) //第一个参数为false代表无放回采样,0.6是采样比例 val result = rdd2.collect() result.foreach(item => println(item)) } @Test def mapValues() = { sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) .mapValues(item => item * 10) .collect() .foreach(println(_)) } @Test def intersection() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.intersection(rdd2) .collect() .foreach(println(_)) } @Test def union() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.union(rdd2) .collect() .foreach(println(_)) } @Test def subtract() = { val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.subtract(rdd2) //rdd1-rdd2 .collect() .foreach(println(_)) } @Test def groupByKey() = { sc.parallelize(Seq(("a", 1), ("a", 1), ("c", 3))) .groupByKey() .collect() .foreach(println(_)) //只有一个参数打印输出可以简写 } @Test def combineByKey() = { // 1.准备集合 val rdd: RDD[(String, Double)] = sc.parallelize(Seq( ("铠", 100.0), ("耀", 99.0), ("镜", 99.0), ("镜", 98.0), ("铠", 97.0) )) // 2.算子 *** 作 // 2.1 createCombiner 转换数据 // 2.2 mergevalue 分区上的聚合 // 2.3 mergeCombiners 把分区上的结果再次聚合,生成最终结果 val combineResult = rdd.combineByKey( createCombiner = (curr: Double) => (curr, 1), mergevalue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1), mergeCombiners = (curr: (Double, Int), agg:(Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2) ) val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2)) // 3. 输出数据 resultRDD.collect().foreach(println(_)) } @Test def foldByKey() = { sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1))) .foldByKey(10)((curr, agg) => curr + agg) .collect() .foreach(println(_)) } @Test def aggregateByKey() = { val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0))) rdd.aggregateByKey(0.8)((zeroValue, item) => item * zeroValue, (curr, agg) => curr + agg) .collect() .foreach(println(_)) } @Test def join() = { val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1))) val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("b", 12))) rdd1.join(rdd2) .collect() .foreach(println(_)) } @Test def sort() = { val rdd1 = sc.parallelize(Seq(2, 4, 1, 5, 1, 8)) val rdd2 = sc.parallelize(Seq(("a", 1), ("b", 3), ("c", 2))) println("-----------------------------") rdd1.sortBy(item => item).collect().foreach(println(_)) println("-----------------------------") rdd2.sortBy(item => item._2).collect().foreach(println(_)) println("-----------------------------") rdd2.sortByKey().collect().foreach(println(_)) } @Test def partitioning() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2) println(rdd.repartition(5).partitions.size) println(rdd.repartition(1).partitions.size) println(rdd.coalesce(5, shuffle = true ).partitions.size) } }
action 部分代码汇总
import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test class ActionOp { val conf = new SparkConf().setMaster("local[6]").setAppName("action_op") val sc = new SparkContext(conf) @Test def reduce() = { val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0))) val result: (String, Double) = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2)) println(result) // reduce的结果是一个元组 } @Test def foreach() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4)) rdd.foreach(item => println(item)) } @Test def count() = { val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4))) println(rdd.count()) // 求出集合中数据的总数 println(rdd.countByKey()) // 得出 Key println(rdd.countByValue()) } @Test def take() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6)) rdd.take(3).foreach(item => println(item)) // 返回前N个数据 println(rdd.first()) // 返回第一个元素 rdd.takeSample(withReplacement = false, num = 3).foreach(item => println(item)) } // 等等数字运算... 注意对于数字类型的支持,都是Action @Test def numberic() = { val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 10, 99, 120, 7)) println(rdd.max()) // 最大值 println(rdd.min()) // 最小值 println(rdd.mean()) // 均值 println(rdd.sum()) //求和 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)