Spark程序设计进阶

Spark程序设计进阶,第1张

竞赛网站访问日志分析
def contest(utils: Utils): Unit = {
    val data = utils.sc.textFile("data/contest_log.txt")
    val users = data.map(line => line.split(',')(3)).distinct()
    println(users.count())
    val pages = data.map(line => line.split(',')(1)).distinct()
    println(pages.count())
    val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1))
    println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n"))
  }
影评 1. 求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)

2. 分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
def solveQuest2(utils: Utils): Unit = {
    //(userID, sex)
    val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2))
    //(userID, (movieID, rating))
    val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))
    val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    //(userID, (sex, (movieID, rating)))  ---> (sex, movieID, rating)
    val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))


    val movieID_rating_F = movieID_rating.filter(x => x._1 == "F").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)
    val movieID_rating_M = movieID_rating.filter(x => x._1 == "M").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)

    movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) }

    //((sex, movieID), Iterable[(sex, movieID, rating)])  ---> (movieID, (sex, avg))
    //   val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2))
    //      .map(x=> {
    //      var sum,avg=0d
    //      val list:List[(String, String, String)]=x._2.toList
    //      if(list.size >50){list.map(x=> ( sum +=x._3.toInt ))
    //        avg=sum*1.0/list.size}
    //      (x._1._2, (x._1._1, avg))
    //    })
    //    //(movieID, movieName)
    //    val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2))
    //    sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
    //    val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName)
    //      .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false)
    //    sex_movieName_avg.take(10).foreach(println(_))
    //    sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_))
  }
3. 分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)
  def solveQuest3(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex)
    val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2)))
    val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_))
    val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_))
  }
4. 年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影
  def solveQuest4(utils: Utils): Unit = {
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3))
    val userID_movieID_times_age = userID_movieID_times.join(userID_age)
    val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _)
    println("age in 18-24 male rating times top10")
    movie_times.top(10)(Ordering.by(_._2)).foreach(println(_))
  }
5. 求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)
  def solveQuest5(utils: Utils): Unit = {
    val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble))
    val userID_age = utils.usersRdd.map(x => (x._1, x._3))
    val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1)))
    val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    println("movie 2116 in every age avg")
    age_avg.foreach(println(_))
  }
6. 求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
  def solveQuest6(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2)))
    val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _)
    val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1

    val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3))
    val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble))
    val top10 = mid_movive_rating.top(10)(Ordering.by(_._3))
    println("movie fav F highest raing top 10")
    top10.foreach(println(_))
  }
7. 求好片(平均评分>=4.0)最多的那个年份的最好看(平均评分最高)的 10 部电影
  def solveQuest7(utils: Utils): Unit = {
    val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7)))
    val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1)))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0)
    val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _)
    val year = year_times.top(1)(Ordering.by(_._2))(0)._1
    val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2))
    val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2))
    top10.foreach(println(_))
  }
8.求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影
  def solveQuest8(utils: Utils): Unit = {
    val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))
    val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy"))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2))
    val top10 = movie1997_avg.top(10)(Ordering.by(_._3))
    top10.foreach(println(_))
  }
9. 该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
  def solveQuest9(utils: Utils): Unit = {
    val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2)))

    val type_avg = mrdd_avg.map(x => {
      for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2)
    }).flatMap(x => x)
    val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b)
    val tmp = types_avg.collect()
    tmp.foreach(x => {
      println("top5 in : " + x._1)
      utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_))
    })
  }
10. 各年评分最高的电影类型(年份,类型,影评分)
  def solveQuest0(utils: Utils): Unit = {
    val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))
    val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect()
    for (year <- years) {
      val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4))
      val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
      val topType = aveRatings.top(1)(Ordering.by(_._2))(0)
      println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2)
    }
  }
完整代码
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

class Utils {
  val conf = new SparkConf().setAppName("FileReview").setMaster("local")
  //初始化sc对象
  val sc = new SparkContext(conf)
  val movie = sc.textFile("ml-1m/movies.dat")
  val ratings = sc.textFile("ml-1m/ratings.dat")
  val users = sc.textFile("ml-1m/users.dat")
  val movieRdd: RDD[(String, String, String)] = movie.map(_.split("::")).map(m => (m(0), m(1), m(2)))
  val ratingsRdd: RDD[(String, String, String, String)] = ratings.map(_.split("::")).map(r => (r(0), r(1), r(2), r(3)))
  val usersRdd: RDD[(String, String, String, String, String)] = users.map(_.split("::")).map(u => (u(0), u(1), u(2), u(3), u(4)))
}

object four {

  def contest(utils: Utils): Unit = {
    val data = utils.sc.textFile("data/contest_log.txt")
    val users = data.map(line => line.split(',')(3)).distinct()
    println(users.count())
    val pages = data.map(line => line.split(',')(1)).distinct()
    println(pages.count())
    val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1))
    println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n"))
  }

  /*
  * 1. 求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
  * */
  def solveQuest1(utils: Utils): Unit = {
    val movieID_rating: RDD[(String, Int)] = utils.ratingsRdd.map(x => (x._2, 1))
    val movieID_times: RDD[(String, Int)] = movieID_rating.reduceByKey(_ + _).sortBy(_._2, false)
    //获得电影id和电影名
    val movieID_name: RDD[(String, String)] = utils.movieRdd.map(x => (x._1, x._2))
    //关联movieID_times和movieID_name,获得电影id,电影名,评分次数
    val result: RDD[(String, Int)] = movieID_times.join(movieID_name).sortBy(_._2._1, false).map(x => (x._2._2, x._2._1))
    result.take(10).foreach(println(_))
  }

  /*
  * 2. 分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
  * */
  def solveQuest2(utils: Utils): Unit = {
    //(userID, sex)
    val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2))
    //(userID, (movieID, rating))
    val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))
    val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    //(userID, (sex, (movieID, rating)))  ---> (sex, movieID, rating)
    val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))


    val movieID_rating_F = movieID_rating.filter(x => x._1 == "F").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)
    val movieID_rating_M = movieID_rating.filter(x => x._1 == "M").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)

    movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) }

    //((sex, movieID), Iterable[(sex, movieID, rating)])  ---> (movieID, (sex, avg))
    //   val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2))
    //      .map(x=> {
    //      var sum,avg=0d
    //      val list:List[(String, String, String)]=x._2.toList
    //      if(list.size >50){list.map(x=> ( sum +=x._3.toInt ))
    //        avg=sum*1.0/list.size}
    //      (x._1._2, (x._1._1, avg))
    //    })
    //    //(movieID, movieName)
    //    val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2))
    //    sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
    //    val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName)
    //      .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false)
    //    sex_movieName_avg.take(10).foreach(println(_))
    //    sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_))
  }

  /*
    * 3. 分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)
    * */
  def solveQuest3(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex)
    val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2)))
    val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_))
    val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_))
  }

  /*
    * 4. 年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影
    * */
  def solveQuest4(utils: Utils): Unit = {
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3))
    val userID_movieID_times_age = userID_movieID_times.join(userID_age)
    val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _)
    println("age in 18-24 male rating times top10")
    movie_times.top(10)(Ordering.by(_._2)).foreach(println(_))
  }

  /*
    * 5. 求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)
    * */
  def solveQuest5(utils: Utils): Unit = {
    val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble))
    val userID_age = utils.usersRdd.map(x => (x._1, x._3))
    val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1)))
    val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    println("movie 2116 in every age avg")
    age_avg.foreach(println(_))
  }

  /*
      * 6. 求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
      * */
  def solveQuest6(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2)))
    val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _)
    val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1

    val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3))
    val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble))
    val top10 = mid_movive_rating.top(10)(Ordering.by(_._3))
    println("movie fav F highest raing top 10")
    top10.foreach(println(_))
  }

  /*
    * 7. 求好片(平均评分>=4.0)最多的那个年份的最好看(平均评分最高)的 10 部电影
    * */
  def solveQuest7(utils: Utils): Unit = {
    val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7)))
    val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1)))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0)
    val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _)
    val year = year_times.top(1)(Ordering.by(_._2))(0)._1
    val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2))
    val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2))
    top10.foreach(println(_))
  }

  /*
    * 8.求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影
    * */
  def solveQuest8(utils: Utils): Unit = {
    val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))
    val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy"))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2))
    val top10 = movie1997_avg.top(10)(Ordering.by(_._3))
    top10.foreach(println(_))
  }

  /*
    * 9. 该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
    * */
  def solveQuest9(utils: Utils): Unit = {
    val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2)))

    val type_avg = mrdd_avg.map(x => {
      for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2)
    }).flatMap(x => x)
    val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b)
    val tmp = types_avg.collect()
    tmp.foreach(x => {
      println("top5 in : " + x._1)
      utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_))
    })
  }

  /*
      * 10. 各年评分最高的电影类型(年份,类型,影评分)
      * */
  def solveQuest0(utils: Utils): Unit = {
    val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))
    val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect()
    for (year <- years) {
      val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4))
      val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
      val topType = aveRatings.top(1)(Ordering.by(_._2))(0)
      println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2)
    }
  }

  def main(args: Array[String]): Unit = {
    val utils = new Utils()

  }
}
实验总结及问题 学会使用什么做什么事情

spark rdd复杂 *** 作

遇到什么问题,如何解决

flatmap & reduceByKey 算子使用问题,查看官方文档解决

还有什么问题尚未解决,可能是什么原因导致的

暂无

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/662129.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-18
下一篇 2022-04-18

发表评论

登录后才能评论

评论列表(0条)

保存