public static void main(String[] args) { //TODO 0.env/创建环境 SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]"); //指定任务名和运行的服务器 JavaSparkContext sc = new JavaSparkContext(conf); //创建sc sc.setLogLevel("WARN"); //设置日志级别 //TODO 1.source/加载数据/创建RDD JavaRDDlines = sc.textFile("data/input/data.txt"); //读取数据文件 List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data); //加载数据 //TODO 2.transformation //map算子 使原值乘2 JavaRDD disRDD = distData.map(new Function () { @Override public Integer call(Integer s) throws Exception { return s*2; } }); //map算子 使原值乘2 lambda 表达式 JavaRDD disRDD2 = distData.map((Function ) s -> s*2); //输出结果 disRDD.foreach(new VoidFunction () { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); //输出结果 lambda 表达式 disRDD2.foreach((VoidFunction ) s -> System.out.println(s)); //按空格分割数据 JavaRDD newlines= lines.flatMap(new FlatMapFunction () { @Override public Iterator call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //按空格分割数据 lambda 表达式 JavaRDD newlines2 = lines.flatMap((FlatMapFunction ) s->Arrays.asList(s.split(" ")).iterator()); //对单词计数 JavaPairRDD wordsRDD = newlines.mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2<>(s,1); } }); //对单词计数 lambda 表达式 JavaPairRDD wordsRDD2 = newlines2.mapToPair((PairFunction ) s -> new Tuple2<>(s,1)); //分组聚合 JavaPairRDD newwordsRDD = wordsRDD.reduceByKey(new Function2 () { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //分组聚合 lambda 表达式 JavaPairRDD newwordsRDD2 = wordsRDD2.reduceByKey((Function2 ) (a,b)->(a+b)); //输出到控制台 newwordsRDD.foreach(new VoidFunction >() { @Override public void call(Tuple2 t) throws Exception { System.out.println(t._1+" "+t._2); } }); //输出到控制台 lambda 表达式 newwordsRDD2.foreach((VoidFunction >) t-> System.out.println(t._1()+" "+t._2())); //输出到目录 newwordsRDD2.repartition(1).saveAsTextFile("data/output/spark2"); //join List > studentList = Arrays.asList( new Tuple2 (1, "张三"), new Tuple2 (2, "李四"), new Tuple2 (3, "王五"), new Tuple2 (4, "赵六")); List > scoreList = Arrays.asList( new Tuple2 (1, 86), new Tuple2 (2, 67), new Tuple2 (3, 78), new Tuple2 (4, 88)); //并行化两个RDD JavaPairRDD students = sc.parallelizePairs(studentList);; JavaPairRDD scores = sc.parallelizePairs(scoreList); //使用join算子关联两个RDD //join以后,会根据key进行join,并返回JavaPairRDD //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的 //第二个泛型类型,是Tuple2 的类型,Tuple2的两个泛型分别为原始RDD的value的类型 JavaPairRDD > studentScores = students.join(scores); //打印 studentScores.foreach(new VoidFunction >>() { @Override public void call(Tuple2 > t) throws Exception { System.out.println("student id:" + t._1); System.out.println("student name:" + t._2._1); System.out.println("student score:" + t._2._2); System.out.println("=========================="); } }); sc.close(); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)