import org.apache.spark.{SparkConf, SparkContext} object PvUv { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("stone").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val file = sc.textFile("data/pvuvdata" , 10) //userid 山东 2018-11-12 1542011088714 3445974150374613566 www.jd.com Buy file.map(x => (x.split("t")(5) , 1) ).reduceByKey(_+_).take(5).foreach(println) println("---------------uv---------------") val keys = file.map( line => { var strs = line.split("t") (strs(5), strs(0)) } ) keys.distinct().map(k => (k._1 ,1) ).reduceByKey(_+_).sortBy(_._2 ,ascending = false).take(5).foreach(println) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)