spark分组排序一

spark分组排序一,第1张

spark分组排序

文章目录
  • 前言
  • 一、样例数据
  • 二、样例源码
  • 三、总结
    • 思路
    • 问题


前言

以一个小例子:统计每个品类下最受欢迎的sku,来记录下spark分组排序的思路。
此代码是参考多易教育spark课程编写


一、样例数据
cate1,sku01
cate1,sku02
cate1,sku01
cate1,sku01
cate1,sku03
cate1,sku04
cate1,sku04
cate2,sku05
cate2,sku06
cate2,sku07
cate2,sku05
cate2,sku07
cate2,sku07
cate2,sku08
cate2,sku09
cate2,sku07
cate2,sku05
cate3,sku10
cate3,sku11
cate3,sku11
cate3,sku11
cate3,sku12
cate3,sku12
cate3,sku12
二、样例源码
object Demo1 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("aa")

    val sc = new SparkContext(conf)

    // 1. 加载数据
    val rdd1 = sc.textFile("./data/sku")

    // 2. 转换数据格式,字符串变元组: cate,sku => ((cate,sku), 1)
    val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
      ((line.split(",")(0), line.split(",")(1)), 1)
    })

    // 3. 统计每个sku的销量
    val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)

    // 4. 由于需要计算每个品类最受欢迎的sku,所以先得按品类分组
    val rdd4 = rdd3.groupBy(_._1._1)

    // 5. 将每个组的元素按照倒序排序,取出第一个元素
    val rdd5 = rdd4.flatMapValues( _.toList.sortBy(-_._2).take(1))

    // 6. 调整数据的输出格式,没有实际计算逻辑
    val rdd6 = rdd5.map(x => (x._1, x._2._1._2, x._2._2))

    rdd6.foreach(println)
  }

}
三、总结 思路
  1. 按照二次排序的key进行求和, 得到形如(k1,k2,value)的数据
  2. 按照一次排序的key进行分组,得到形如(k1,Array(k2 value))的数据
  3. 组内排序。组内排序需要将数据转化为scala的数组,然后在executor本地排序
问题
val rdd5 = rdd4.flatMapValues(_.toList.sortBy(-_._2).take(1))

这行代码有点问题。_.toList直接将数据全部放到内存,可能会产生oom。如何修改,请看spark分区排序二

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678815.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存