- 前言
- 一、样例数据
- 二、样例源码
- 三、总结
- 思路
- 问题
前言
以一个小例子:统计每个品类下最受欢迎的sku,来记录下spark分组排序的思路。
此代码是参考多易教育spark课程编写
一、样例数据
cate1,sku01 cate1,sku02 cate1,sku01 cate1,sku01 cate1,sku03 cate1,sku04 cate1,sku04 cate2,sku05 cate2,sku06 cate2,sku07 cate2,sku05 cate2,sku07 cate2,sku07 cate2,sku08 cate2,sku09 cate2,sku07 cate2,sku05 cate3,sku10 cate3,sku11 cate3,sku11 cate3,sku11 cate3,sku12 cate3,sku12 cate3,sku12二、样例源码
object Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("aa") val sc = new SparkContext(conf) // 1. 加载数据 val rdd1 = sc.textFile("./data/sku") // 2. 转换数据格式,字符串变元组: cate,sku => ((cate,sku), 1) val rdd2: RDD[((String, String), Int)] = rdd1.map(line => { ((line.split(",")(0), line.split(",")(1)), 1) }) // 3. 统计每个sku的销量 val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _) // 4. 由于需要计算每个品类最受欢迎的sku,所以先得按品类分组 val rdd4 = rdd3.groupBy(_._1._1) // 5. 将每个组的元素按照倒序排序,取出第一个元素 val rdd5 = rdd4.flatMapValues( _.toList.sortBy(-_._2).take(1)) // 6. 调整数据的输出格式,没有实际计算逻辑 val rdd6 = rdd5.map(x => (x._1, x._2._1._2, x._2._2)) rdd6.foreach(println) } }三、总结 思路
- 按照二次排序的key进行求和, 得到形如(k1,k2,value)的数据
- 按照一次排序的key进行分组,得到形如(k1,Array(k2 value))的数据
- 组内排序。组内排序需要将数据转化为scala的数组,然后在executor本地排序
val rdd5 = rdd4.flatMapValues(_.toList.sortBy(-_._2).take(1))
这行代码有点问题。_.toList直接将数据全部放到内存,可能会产生oom。如何修改,请看spark分区排序二
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)