spark基础算子java实现

spark基础算子java实现,第1张

spark基础算子java实现
public static void main(String[] args) {
        //TODO 0.env/创建环境
        SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]"); //指定任务名和运行的服务器
        JavaSparkContext sc = new JavaSparkContext(conf); //创建sc
        sc.setLogLevel("WARN"); //设置日志级别

        //TODO 1.source/加载数据/创建RDD
        JavaRDD lines = sc.textFile("data/input/data.txt"); //读取数据文件
        List data = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD distData = sc.parallelize(data); //加载数据

        //TODO 2.transformation
        //map算子 使原值乘2
        JavaRDD disRDD = distData.map(new Function() {
            @Override
            public Integer call(Integer s) throws Exception {
                return s*2;
            }
        });
        //map算子 使原值乘2 lambda 表达式
        JavaRDD disRDD2 = distData.map((Function) s -> s*2);


        //输出结果
        disRDD.foreach(new VoidFunction() {
            @Override
            public void call(Integer s) throws Exception {
                System.out.println(s);
            }
        });
        //输出结果 lambda 表达式
        disRDD2.foreach((VoidFunction) s -> System.out.println(s));
        

        //按空格分割数据
        JavaRDD newlines= lines.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //按空格分割数据  lambda 表达式
        JavaRDD newlines2 = lines.flatMap((FlatMapFunction) s->Arrays.asList(s.split(" ")).iterator());


        //对单词计数
        JavaPairRDD wordsRDD = newlines.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });
        //对单词计数 lambda 表达式
        JavaPairRDD wordsRDD2 = newlines2.mapToPair((PairFunction) s -> new Tuple2<>(s,1));

        //分组聚合
        JavaPairRDD newwordsRDD = wordsRDD.reduceByKey(new Function2() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });
        //分组聚合 lambda 表达式
        JavaPairRDD newwordsRDD2 = wordsRDD2.reduceByKey((Function2) (a,b)->(a+b));

        //输出到控制台
        newwordsRDD.foreach(new VoidFunction>() {
            @Override
            public void call(Tuple2 t) throws Exception {
                System.out.println(t._1+" "+t._2);
            }
        });
        //输出到控制台 lambda 表达式
        newwordsRDD2.foreach((VoidFunction>) t-> System.out.println(t._1()+" "+t._2()));

        //输出到目录
        newwordsRDD2.repartition(1).saveAsTextFile("data/output/spark2");

        //join
        List> studentList = Arrays.asList(
                new Tuple2(1, "张三"),
                new Tuple2(2, "李四"),
                new Tuple2(3, "王五"),
                new Tuple2(4, "赵六"));

        List> scoreList = Arrays.asList(
                new Tuple2(1, 86),
                new Tuple2(2, 67),
                new Tuple2(3, 78),
                new Tuple2(4, 88));

        //并行化两个RDD
        JavaPairRDD students = sc.parallelizePairs(studentList);;
        JavaPairRDD scores = sc.parallelizePairs(scoreList);

        //使用join算子关联两个RDD
        //join以后,会根据key进行join,并返回JavaPairRDD
        //JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的
        //第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型
        JavaPairRDD> studentScores = students.join(scores);

        //打印
        studentScores.foreach(new VoidFunction>>() {
            @Override
            public void call(Tuple2> t)
                    throws Exception {
                System.out.println("student id:" + t._1);
                System.out.println("student name:" + t._2._1);
                System.out.println("student score:" + t._2._2);
                System.out.println("==========================");
            }
        });
        sc.close();


    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678805.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存