使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3

使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3,第1张

使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3

假设这些信息都存存储在一个文件里
时间数 省份 城市 用户 广告
如下所示:
(中间字段使用空格隔开)

 

(1)数据转换为键值对的形式(省份-广告,点击数)
(2)计算相同key的点击数和(省份-广告,点击数和)
(3转换key的结构(省分广告,点击数和) => (省份,(广告,点击数和) )。
(4)按照key聚合。
(5)对值进行排序

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.Rating;

import scala.Tuple2;

public class ProviceAD {
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		
	SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaRDD filedata=sc.textFile("file:///home/gyq/eclipse-workspace/ALS/PeopleInfo.txt");
    JavaRDD filedatafi=filedata.filter(f->{
    	String arr[]=f.split(" ");
    	if(arr.length==5)
    		return true;
    	return false;
    });
    JavaRDD data1=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[1]).iterator());
    JavaRDD data2=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[2]).iterator());
    JavaPairRDD,Integer> PAdata=filedatafi.mapToPair(s->{
    	return new Tuple2,Integer>(
				new Tuple2(s.split(" ")[1],s.split(" ")[4]),1);
    		
    	 });
   // PAdata.foreach(f->System.out.println(f));//第一题
    JavaPairRDD,Integer> ggs=PAdata.reduceByKey((x,y)->x+y);
   //ggs.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f));//第二题
    
    JavaPairRDD> pairrdd1=ggs.mapToPair(f->{
    	return new Tuple2>(
    			Integer.valueOf(f._1._1),new Tuple2(f._1._2,f._2()));
    });
    //pairrdd1.foreach(f->System.out.println("省份"+ ",(广告,点击数)"+f));//第三题
    JavaPairRDD>> gg=pairrdd1.groupByKey();
    
    //gg.foreach(f->System.out.println("按照key聚合"+f._1+"------"+f._2));//第四题
    JavaRDD rdd=gg.map(f->{
    	return Integer.valueOf(f._1)+" "+f._2;
    });
    JavaRDD SortresultRDD = rdd.sortBy(f->{return f.split(" ")[0];},true,1);//fun5.1排序2
    //SortresultRDD.foreach(f->System.out.println(f));
   JavaPairRDD>> gss=gg.sortByKey(true,1);//fun5.2排序1
   //gss.foreach(f1->System.out.println("ppp"+f1));//第五题
   JavaRDD gss1=rdd.sortBy(f->{return f.split(" ")[0];},false,1);//

   SortresultRDD.foreach(f->System.out.println(f));//第5题
   
   
	
	}
}

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5605731.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存