假设这些信息都存存储在一个文件里
时间数 省份 城市 用户 广告
如下所示:
(中间字段使用空格隔开)
(1)数据转换为键值对的形式(省份-广告,点击数)
(2)计算相同key的点击数和(省份-广告,点击数和)
(3转换key的结构(省分广告,点击数和) => (省份,(广告,点击数和) )。
(4)按照key聚合。
(5)对值进行排序。
import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.Rating; import scala.Tuple2; public class ProviceAD { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDDfiledata=sc.textFile("file:///home/gyq/eclipse-workspace/ALS/PeopleInfo.txt"); JavaRDD filedatafi=filedata.filter(f->{ String arr[]=f.split(" "); if(arr.length==5) return true; return false; }); JavaRDD data1=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[1]).iterator()); JavaRDD data2=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[2]).iterator()); JavaPairRDD ,Integer> PAdata=filedatafi.mapToPair(s->{ return new Tuple2 ,Integer>( new Tuple2 (s.split(" ")[1],s.split(" ")[4]),1); }); // PAdata.foreach(f->System.out.println(f));//第一题 JavaPairRDD ,Integer> ggs=PAdata.reduceByKey((x,y)->x+y); //ggs.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f));//第二题 JavaPairRDD > pairrdd1=ggs.mapToPair(f->{ return new Tuple2 >( Integer.valueOf(f._1._1),new Tuple2 (f._1._2,f._2())); }); //pairrdd1.foreach(f->System.out.println("省份"+ ",(广告,点击数)"+f));//第三题 JavaPairRDD >> gg=pairrdd1.groupByKey(); //gg.foreach(f->System.out.println("按照key聚合"+f._1+"------"+f._2));//第四题 JavaRDD rdd=gg.map(f->{ return Integer.valueOf(f._1)+" "+f._2; }); JavaRDD SortresultRDD = rdd.sortBy(f->{return f.split(" ")[0];},true,1);//fun5.1排序2 //SortresultRDD.foreach(f->System.out.println(f)); JavaPairRDD >> gss=gg.sortByKey(true,1);//fun5.2排序1 //gss.foreach(f1->System.out.println("ppp"+f1));//第五题 JavaRDD gss1=rdd.sortBy(f->{return f.split(" ")[0];},false,1);// SortresultRDD.foreach(f->System.out.println(f));//第5题 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)