算子 : Operator( *** 作)
RDD的方法和Scala集合对象的方法不一样
Scala集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行, 为了和Scala集合对象区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的 *** 作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
Value 类型 map 转换
map *** 作只能是来一个计算一个,出去一个
map: 转换映射功能,及把一个数据转换成一个新的数据, 把A变成B
匿名函数:只关系逻辑不关心方法名,能省则省,参数名可以推断出来及不需要声明参数类型,逻辑只有一句话则不需要{},入参只一个可以不写()
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - map val rdd = sc.makeRDD(List(1,2,3,4)) // 1,2,3,4 // 2,4,6,8 // 转换函数:需要可以转入参数,再将改变后的参数输出 def mapFunction(num:Int): Int = { num * 2 } //val mapRDD: RDD[Int] = rdd.map(mapFunction) //val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2}) //val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2) //val mapRDD: RDD[Int] = rdd.map((num)=>num*2) //val mapRDD: RDD[Int] = rdd.map(num=>num*2) val mapRDD: RDD[Int] = rdd.map(_*2) mapRDD.collect().foreach(println) sc.stop() } }
小功能:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
apache.log:
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png 83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
代码实现
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - map val rdd = sc.textFile("datas/apache.log") // 长的字符串 // 短的字符串 val mapRDD: RDD[String] = rdd.map( line => { val datas = line.split(" ") datas(6) } ) mapRDD.collect().foreach(println) sc.stop() } }
执行顺序
// 1. rdd的计算一个分区内的数据是一个一个执行逻辑
只使用一个分区,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
分区内数据的执行是有序的。
// 2. 不同分区数据计算是无序的。
RDD所有算子转换 *** 作转换后的数据分区不变,除分组之外
不因为算子 *** 作后数据进去其他分区,提高计算效率。
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Operator_Transform_Part { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - map val rdd = sc.makeRDD(List(1,2,3,4),2) // 分区1【1,2】,分区2【3,4】 rdd.saveAsTextFile("output") val mapRDD = rdd.map(_*2) //转换之后的数据还是存在原来的分区 // 分区1【2,4】,分区2【6,8】 mapRDD.saveAsTextFile("output1") sc.stop() } }mapValues
mapValues(func)
功能:对键值对每个value都应用一个函数,但是,key不会发生变化。
示例
val list = List("hadoop","spark","hive","spark") val rdd = sc.parallelize(list) val pairRdd = rdd.map(x => (x,1)) pairRdd.mapValues(_+1).collect.foreach(println)//对每个value进行+1
结果
(hadoop,2) (spark,2) (hive,2) (spark,2)mapPartitions
- map *** 作只能是来一个计算一个,出去一个没有缓存的概念及来一批数据处理数据性能不高。
- mapPartitions是先拿到一个分区内的所有数据再做处理, 及mapPartitions : 可以以分区为单位进行数据转换 *** 作, 但是会将整个分区的数据加载到内存进行引用, 如果处理完的数据是不会被释放掉,存在对象的引用。 在内存较小,数据量较大的场合下,容易出现内存溢出。
- MapPartitions 算子需要传递一个迭代器,返回一个迭代器
val dataRDD1: RDD[Int] = dataRDD.mapPartitions( datas => { datas.filter(_==2) } )
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - mapPartitions val rdd = sc.makeRDD(List(1,2,3,4), 2) // mapPartitions : 可以以分区为单位进行数据转换 *** 作 // 但是会将整个分区的数据加载到内存进行引用 // 如果处理完的数据是不会被释放掉,存在对象的引用。 // 在内存较小,数据量较大的场合下,容易出现内存溢出。 val mpRDD: RDD[Int] = rdd.mapPartitions( iter => { println(">>>>>>>>>>") iter.map(_ * 2) } ) mpRDD.collect().foreach(println) sc.stop() } }
小功能:获取每个数据分区的最大值
map 和 mapPartitions 的区别package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - mapPartitions val rdd = sc.makeRDD(List(1,2,3,4), 2) // 【1,2】,【3,4】 // 【2】,【4】 val mpRDD = rdd.mapPartitions( iter => { List(iter.max).iterator } ) mpRDD.collect().foreach(println) sc.stop() } }mapPartitionsWithIndex
根据分区号选择计算哪个分区数据
val dataRDD1 = dataRDD.mapPartitionsWithIndex( (index, datas) => { datas.map(index, _) } )
小功能:获取第二个数据分区的数据
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.{SparkConf, SparkContext} object Spark03_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - mapPartitions val rdd = sc.makeRDD(List(1,2,3,4), 2) // 【1,2】,【3,4】 val mpiRDD = rdd.mapPartitionsWithIndex( (index, iter) => { //如果分区号是1计算 if ( index == 1 ) { iter } else {//其他分布不计算 Nil.iterator } } ) mpiRDD.collect().foreach(println) sc.stop() } }
查看数据在哪个分区
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.{SparkConf, SparkContext} object Spark03_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - mapPartitions val rdd = sc.makeRDD(List(1,2,3,4)) val mpiRDD = rdd.mapPartitionsWithIndex( (index, iter) => { // 1, 2, 3, 4 //(0,1)(2,2),(4,3),(6,4) //查看数据在哪个分区 iter.map( num => { (index, num) } ) } ) mpiRDD.collect().foreach(println) sc.stop() } }flatMap 扁平化映射
flatMap: 把一条数据拆分成一个一个个体使用
val dataRDD = sparkContext.makeRDD(List( List(1,2),List(3,4) ),1) val dataRDD1 = dataRDD.flatMap( list => list )
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark04_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - flatMap val rdd: RDD[String] = sc.makeRDD(List( "Hello Scala", "Hello Spark" )) val flatRDD: RDD[String] = rdd.flatMap( s => { s.split(" ") } ) flatRDD.collect().foreach(println) sc.stop() } }
小功能:将 List(List(1,2),3,List(4,5))进行扁平化 *** 作
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark04_RDD_Operator_Transform2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - flatMap val rdd = sc.makeRDD(List(List(1,2),3,List(4,5))) val flatRDD = rdd.flatMap( data => { data match { case list:List[_] => list case dat => List(dat) } } ) flatRDD.collect().foreach(println) sc.stop() } }glom 聚合
glomg和flatMap的作用相反
flatMap把一个拆成多个 list->int
glomg把多个合成一个 int -> array
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4 ),1) val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark05_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - glom val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2) // List => Int // Int => Array //返回的是一个数组 val glomRDD: RDD[Array[Int]] = rdd.glom() glomRDD.collect().foreach(data=> println(data.mkString(","))) sc.stop() } }
小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark05_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - glom val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2) // 【1,2】,【3,4】 // 【2】,【4】 // 【6】 //把一个分区中数据放入一个数组中 val glomRDD: RDD[Array[Int]] = rdd.glom() //在取出每个数组中的最大值 val maxRDD: RDD[Int] = glomRDD.map( array => { array.max } ) //求和 println(maxRDD.collect().sum) sc.stop() } }groupBy 分组
// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
// 相同的key值的数据会放置在一个组中
分组和分区没有必然的关系,分组后的数据可能被放在一个分区,也可能放在多个分区,及分组会把数据打乱再重新组合
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) val dataRDD1 = dataRDD.groupBy( _%2 )
奇偶分组
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark06_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - groupBy val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2) // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组 // 相同的key值的数据会放置在一个组中 def groupFunction(num:Int) = { //取模以后的结果当做key,及根据0,1奇偶分组 num % 2 } val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction) groupRDD.collect().foreach(println) sc.stop() } }
(0,CompactBuffer(2, 4)) (1,CompactBuffer(1, 3))
根据每个单词首字母分区
❖ 小功能:将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark06_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - groupBy val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2) // 分组和分区没有必然的关系 //根据每个单词首字母分区 val groupRDD = rdd.groupBy(_.charAt(0)) groupRDD.collect().foreach(println) sc.stop() } }
(H,CompactBuffer(Hello, Hadoop)) (S,CompactBuffer(Spark, Scala))
❖ 小功能:从服务器日志数据 apache.log 中获取每个小时问量。
package com.atguigu.bigdata.spark.core.rdd.operator.transform import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark06_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - groupBy val rdd = sc.textFile("datas/apache.log") val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map( line => { val datas = line.split(" ") val time = datas(3) //time.substring(0, ) val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val date: Date = sdf.parse(time) val sdf1 = new SimpleDateFormat("HH") val hour: String = sdf1.format(date) (hour, 1) } ).groupBy(_._1) timeRDD.map{ case ( hour, iter ) => { (hour, iter.size) } }.collect.foreach(println) sc.stop() } }
❖ 小功能:WordCount。
groupBay也可是实现wordConut ,先根据word分区再计算数组的长度。
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4 ),1) val dataRDD1 = dataRDD.filter(_%2 == 0)
过滤掉奇数,保留偶数
package com.atguigu.bigdata.spark.core.rdd.operator.transform import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark07_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.makeRDD(List(1,2,3,4)) //为true数据保留 val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0) filterRDD.collect().foreach(println) sc.stop() } }
小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark07_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.textFile("datas/apache.log") rdd.filter( line => { val datas = line.split(" ") val time = datas(3) //判断是否以"17/05/2015"开头 time.startsWith("17/05/2015") } ).collect().foreach(println) sc.stop() } }sample 抽样
// sample算子需要传递三个参数 // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃) // 2. 第二个参数表示, // 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念 // 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数 // 3. 第三个参数表示,抽取数据时随机算法的种子 // 如果不传递第三个参数,那么使用的是当前系统时间 // println(rdd.sample( // false, // 0.4 // //1 // ).collect().mkString(","))
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4 ),1) // 抽取数据不放回(伯努利算法) // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。 // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不 要 // 第一个参数:抽取的数据是否放回,false:不放回 // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; // 第三个参数:随机数种子 val dataRDD1 = dataRDD.sample(false, 0.5) // 抽取数据放回(泊松算法) // 第一个参数:抽取的数据是否放回,true:放回;false:不放回 // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数 // 第三个参数:随机数种子 val dataRDD2 = dataRDD.sample(true, 2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.{SparkConf, SparkContext} object Spark08_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10)) // sample算子需要传递三个参数 // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃) // 2. 第二个参数表示, // 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念 // 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数 // 3. 第三个参数表示,抽取数据时随机算法的种子 // 如果不传递第三个参数,那么使用的是当前系统时间 // println(rdd.sample( // false, // 0.4 // //1 // ).collect().mkString(",")) println(rdd.sample( false, 0.4 //1 ).collect().mkString(",")) sc.stop() } }
3,4,8distinct 去重
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),1) val dataRDD1 = dataRDD.distinct() val dataRDD2 = dataRDD.distinct(2)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark09_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4)) // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null) // (1, null)(1, null)(1, null) // (null, null) => null // (1, null) => 1 val rdd1: RDD[Int] = rdd.distinct() rdd1.collect().foreach(println) sc.stop() } }coalesce 缩减分区
多次过滤后每个分区的数据都少了,需要合并数据缩减分区
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),6) val dataRDD1 = dataRDD.coalesce(2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark10_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3) // coalesce方法默认情况下不会将分区的数据打乱重新组合 // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜 // 如果想要让数据均衡,可以进行shuffle处理 //val newRDD: RDD[Int] = rdd.coalesce(2) val newRDD: RDD[Int] = rdd.coalesce(2,true) newRDD.saveAsTextFile("output3") sc.stop() } }
coalesce 默认情况是缩减分区,不打乱原有数据的顺序,有可能导致缩减分区后数据不均衡, 如果想要让数据均衡,可以进行shuffle处理,参数 shuffle 的默认值为 false
//如果想要让数据均衡,可以进行shuffle处理,参数 shuffle 的默认值为 false //val newRDD: RDD[Int] = rdd.coalesce(2) val newRDD: RDD[Int] = rdd.coalesce(2,true)repartition 增大分区
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),2) val dataRDD1 = dataRDD.repartition(4)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark11_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - filter val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2) // coalesce算子可以扩大分区的,但是如果不进行shuffle *** 作,不打乱数据顺序是没有意义,不起作用。 // 所以如果想要实现扩大分区的效果,需要使用shuffle *** 作 // spark提供了一个简化的 *** 作 // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle //val newRDD: RDD[Int] = rdd.coalesce(3, true) val newRDD: RDD[Int] = rdd.repartition(3) newRDD.saveAsTextFile("output") sc.stop() } }sortBy 根据指定规则排序
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2 ),2) //根据num排序 val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark12_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - sortBy val rdd = sc.makeRDD(List(6,2,4,5,3,1), 2) val newRDD: RDD[Int] = rdd.sortBy(num=>num) newRDD.collect().foreach(println) sc.stop() } }
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark12_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - sortBy val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2) // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式,ascending默认是true及升序,false为降序 // sortBy默认情况下,不会改变分区。但是中间存在shuffle *** 作, //Int格式排序 val newRDD = rdd.sortBy(t=>t._1.toInt, false) newRDD.collect().foreach(println) sc.stop() } }
//以原始String 类型排序 val newRDD = rdd.sortBy(t=>t._1, false)双 Value 类型
双 Value 类型指的是两个数据源之间的关联 *** 作
intersection union subtract 要求两个数据流的类型一致,zip可以不一致
- 交集,并集和差集要求两个数据源数据类型保持一致
- 拉链 *** 作两个数据源的类型可以不一致,但要求两个数据源要求分区数量和每个分区中的数据要保持一致
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) val dataRDD = dataRDD1.intersection(dataRDD2)union 并集
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) val dataRDD = dataRDD1.union(dataRDD2)
与数学中的并集不同的是 相同的数据不会只保留一个,spark中的并集只是把两个数据源直接放在一起。
1,2,3,4,3,4,5,6subtract差集
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) val dataRDD = dataRDD1.subtract(dataRDD2)zip拉链
// Can't zip RDDs with unequal numbers of partitions: List(2, 4)
- 两个数据源要求分区数量要保持一致
// Can only zip RDDs with same number of elements in each partition - 两个数据源要求分区中数据数量保持一致
(1,3),(2,4),(3,5),(4,6)
Spark中 拉链 *** 作与scala中不同的是两个数据源要求分区中数据数量保持一致
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark13_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //scala 中的拉链 *** 作,数据长度可以不一致 val ids = List[Long](1,2,3,4,5,6,7) val ids1 = List[Long](2,3,4,5,6,7) //ids.tail=(2,3,4,5,6,7) val okflowIds: List[(Long, Long)] = ids.zip(ids1) // TODO 算子 - 双Value类型 // Can't zip RDDs with unequal numbers of partitions: List(2, 4) // 两个数据源要求分区数量要保持一致 // Can only zip RDDs with same number of elements in each partition // 两个数据源要求分区中数据数量保持一致 val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2) val rdd2 = sc.makeRDD(List(3,4,5,6),2) // 运行报错 val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) println(rdd6.collect().mkString(",")) sc.stop() } }
交集+并集+差集+拉链
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark13_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - 双Value类型 // 交集,并集和差集要求两个数据源数据类型保持一致 // 拉链 *** 作两个数据源的类型可以不一致 val rdd1 = sc.makeRDD(List(1,2,3,4)) val rdd2 = sc.makeRDD(List(3,4,5,6)) val rdd7 = sc.makeRDD(List("3","4","5","6")) // 交集 : 【3,4】 val rdd3: RDD[Int] = rdd1.intersection(rdd2) //类型不一致,编译不通过 //val rdd8 = rdd1.intersection(rdd7) println(rdd3.collect().mkString(",")) // 并集 : 【1,2,3,4,3,4,5,6】 val rdd4: RDD[Int] = rdd1.union(rdd2) println(rdd4.collect().mkString(",")) // 差集 : 【1,2】 val rdd5: RDD[Int] = rdd1.subtract(rdd2) // 差集 : 【1,2】 val rdd9: RDD[Int] = rdd2.subtract(rdd1) println(rdd5.collect().mkString(",")) println(rdd9.collect().mkString(",")) // 拉链 : 【1-3,2-4,3-5,4-6】 //(1,3),(2,4),(3,5),(4,6) val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) //zip可以类型不一致 val rdd8 = rdd1.zip(rdd7) println(rdd6.collect().mkString(",")) sc.stop() } }Key - Value 类型
要求数据流失(key, value)类型
partitionBy实现改变数据所在分区的位置
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3) import org.apache.spark.HashPartitioner
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} object Spark14_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List(1,2,3,4),2) val mapRDD:RDD[(Int, Int)] = rdd.map((_,1)) // RDD => PairRDDFunctions // 隐式转换(二次编译) // partitionBy根据指定的分区规则对数据进行重分区 val newRDD = mapRDD.partitionBy(new HashPartitioner(2)) newRDD.partitionBy(new HashPartitioner(2)) newRDD.saveAsTextFile("output") sc.stop() } }
如果重分区的分区器和当前 RDD 的分区器一样(分区器的类型和分区的数量一样)怎么办?
直接返回RDD 不会创建新的RDD
Spark 还有其他分区器吗
hash分区器和range(范围)分区器
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.reduceByKey(_+_) val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} object Spark15_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // reduceByKey : 相同的key的数据进行value数据的聚合 *** 作 // scala语言中一般的聚合 *** 作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合 // 【1,2,3】 // 【3,3】 // 【6】 // reduceByKey中如果key的数据只有一个,是不会参与运算的。 val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => { println(s"x = ${x}, y = ${y}") x + y } ) reduceRDD.collect().foreach(println) sc.stop() } }groupByKey
相同的key放的一个group
groupByky和reduceByKey的区别,reduceByKey有聚合(求相同keyd的数量)的作用,groupByky是把key放在一组
groupByKey:
(a,CompactBuffer(1, 2, 3)) (b,CompactBuffer(4))
educeByKey:
(a,6) (b,4)
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.groupByKey() val dataRDD3 = dataRDD1.groupByKey(2) val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark16_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组 // 元组中的第一个元素就是key, // 元组中的第二个元素就是相同key的value的集合 val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey() groupRDD.collect().foreach(println) val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1) groupRDD1.collect().foreach(println) sc.stop() } }
reduceByKey 和 groupByKey 的区别?
先对每个分区内做最大值判断,再分区间做聚合 *** 作
// TODO : 取出每个分区内相同 key 的最大值然后分区间相加 // aggregateByKey 算子是函数柯里化,存在两个参数列表 // 1. 第一个参数列表中的参数表示初始值 // 2. 第二个参数列表中含有两个参数 // 2.1 第一个参数表示分区内的计算规则 // 2.2 第二个参数表示分区间的计算规则 val rdd = sc.makeRDD(List( ("a",1),("a",2),("c",3), ("b",4),("c",5),("c",6) ),2) // 0:("a",1),("a",2),("c",3) => (a,10)(c,10) // => (a,10)(b,10)(c,20) // 1:("b",4),("c",5),("c",6) => (b,10)(c,10) val resultRDD = rdd.aggregateByKey(10)( (x, y) => math.max(x,y), (x, y) => x + y ) resultRDD.collect().foreach(println)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark17_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("a", 4) ),2) // (a,【1,2】), (a, 【3,4】) // (a, 2), (a, 4) // (a, 6) // aggregateByKey存在函数柯里化,有两个参数列表 // 第一个参数列表,需要传递一个参数,表示为初始值 // 主要用于当碰见第一个key的时候,和value进行分区内计算 // 第二个参数列表需要传递2个参数 // 第一个参数表示分区内计算规则 // 第二个参数表示分区间计算规则 // math.min(x, y) // math.max(x, y) rdd.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect.foreach(println) sc.stop() } }
(a,6)
reduceByKey和aggregateByKey的区别
reduceByKey:分区内和分区间的计算规则一样
aggregateByKey:分区内和分区间的计算规则不一样
分区内计算规则和分区间计算规则相同怎么办?
aggregateByKey 分区内和分区间的计算规则可以相同也可以不同。
计算规则相同
aggregateByKey最终的返回数据结果应该和初始值的类型保持一致:
获取相同key的数据的平均值
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark18_RDD_Operator_Transform3 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // aggregateByKey最终的返回数据结果应该和初始值的类型保持一致 //val aggRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _) //aggRDD.collect.foreach(println) // 获取相同key的数据的平均值 => (a, 3),(b, 4) val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )( ( t, v ) => { (t._1 + v, t._2 + 1) }, (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2) } ) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) sc.stop() } }
(b,4) (a,3)foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
import org.apache.spark.{SparkConf, SparkContext} object Spark17_RDD_Operator_Transform2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println) // 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法 rdd.foldByKey(0)(_+_).collect.foreach(println) sc.stop() } }
(b,12) (a,9)combineByKey
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)) val input: RDD[(String, Int)] = sc.makeRDD(list, 2) val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey( (_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) )
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
-
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
-
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相
同 -
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规
则可以不相同 -
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区
内和分区间计算规则不相同。
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark20_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //底层都是调用的相同的方法 combineByKeyWithClassTag rdd.reduceByKey(_+_) // wordcount rdd.aggregateByKey(0)(_+_, _+_) // wordcount rdd.foldByKey(0)(_+_) // wordcount rdd.combineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y) // wordcount sc.stop() } }join 谨慎使用OOM的风险
相当于sql表的内连接
key的类型必须相同 value类型可以不相同
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"))) val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd.join(rdd1).collect().foreach(println)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark21_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd1 = sc.makeRDD(List( ("a", 1), ("a", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 5), ("c", 6),("a", 4) )) // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组 // 如果两个数据源中key没有匹配上,那么数据不会出现在结果中 // 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。 val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2) joinRDD.collect().foreach(println) sc.stop() } }
两个不同数据源的数据,相同的key的value会连接在一起,形成元组
// TODO 算子 - (Key - Value类型) val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("c", 5), ("b", 6),("a", 4) )) // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组 val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,4)) (b,(2,6)) (c,(3,5))
如果两个数据源中key没有匹配上,那么数据不会出现在结果中
val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 5), ("c", 6),("d", 4) )) // join : // 如果两个数据源中key没有匹配上,那么数据不会出现在结果中 val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,5)) (c,(3,6))
如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
val rdd1 = sc.makeRDD(List( ("a", 1), ("a", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 5), ("c", 6),("a", 4) )) // 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。 val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,5)) (a,(1,4)) (a,(2,5)) (a,(2,4)) (c,(3,6))leftOuterJoin / rightOuterJoin
相当于sql表的外连接,及以一条数据流为主
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark22_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 4), ("b", 5)//,("c", 6) )) val leftJoinRDD = rdd1.leftOuterJoin(rdd2) // val rightJoinRDD = rdd1.rightOuterJoin(rdd2) leftJoinRDD.collect().foreach(println) //rightJoinRDD.collect().foreach(println) sc.stop() } }
(a,(1,Some(4))) (b,(2,Some(5))) (c,(3,None))
val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2)//, ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 4), ("b", 5),("c", 6) )) // val leftJoinRDD = rdd1.leftOuterJoin(rdd2) val rightJoinRDD = rdd1.rightOuterJoin(rdd2) rightJoinRDD.collect().foreach(println)
(a,(Some(1),4)) (b,(Some(2),5)) (c,(None,6))cogroup
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3))) val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark23_RDD_Operator_Transform { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子 - (Key - Value类型) val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2)//, ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 4), ("b", 5),("c", 6),("c", 7) )) // cogroup : connect + group (分组,连接) val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2) cgRDD.collect().foreach(println) sc.stop() } }
即使数据没有也会对其统计,所cogroup不要求两个流中必须存在相同的key
(a,(CompactBuffer(1),CompactBuffer(4))) (b,(CompactBuffer(2),CompactBuffer(5))) (c,(CompactBuffer(),CompactBuffer(6, 7)))案例实 ***
数据agent.log
1516609143867 6 7 64 16 1516609143869 9 4 75 18 1516609143869 1 7 87 12 1516609143869 2 8 92 9 1516609143869 6 7 84 24 1516609143869 1 8 95 5 1516609143869 8 1 90 29 1516609143869 3 3 36 16 1516609143869 3 3 54 22 1516609143869 7 6 33 5 1516609143869 8 2 91 27 1516609143869 0 5 66 5 1516609143869 1 3 33 6 1516609143869 6 2 97 21 1516609143869 5 2 95 24 1516609143869 8 9 73 11 1516609143869 4 8 62 15 1516609143869 5 5 40 23
整体思路
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark24_RDD_Req { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 案例实 *** // 1. 获取原始数据:时间戳,省份,城市,用户,广告 val dataRDD = sc.textFile("datas/agent.log") // 2. 将原始数据进行结构的转换。方便统计 // 时间戳,省份,城市,用户,广告 // => // ( ( 省份,广告 ), 1 ) val mapRDD = dataRDD.map( line => { val datas = line.split(" ") (( datas(1), datas(4) ), 1) } ) // 3. 将转换结构后的数据,进行分组聚合 // ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum ) val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_) // 4. 将聚合的结果进行结构的转换 // ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) val newMapRDD = reduceRDD.map{ case ( (prv, ad), sum ) => { (prv, (ad, sum)) } } // 5. 将转换结构后的数据根据省份进行分组 // ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 ) val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey() // 6. 将分组后的数据组内排序(降序),取前3名 val resultRDD = groupRDD.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3) } ) // 7. 采集数据打印在控制台 resultRDD.collect().foreach(println) sc.stop() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)