Java
public class WordCount { public static void main(String[] args) { // 配置sparkconf,启动spark应用程序 SparkConf sparkConf = new SparkConf().setAppName("spark").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); //使用textFile方法读取文件,逐行读取 JavaRDDline = sc.textFile("D:/JavaCode/javascala/src/main/scala/rdd/builder/word"); // flatMap算子,拆分单词 JavaRDD words = line.flatMap(new FlatMapFunction () { public Iterator call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //map算子,对单词进行映射 word =>(Word,1) JavaPairRDD word = words.mapToPair(new PairFunction () { public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }); //reduceByKey算子,最相同的单词进行分组聚合 JavaPairRDD wordCount = word.reduceByKey(new Function2 () { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //遍历打印结果 wordCount.foreach(new VoidFunction >() { public void call(Tuple2 stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2); } }); sc.stop(); } }
Scala
object WordCount { def main(args: Array[String]): Unit = { //wordcount程序 //1.创建spark应用程序 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(sparkConf) //2. 读取文件 val words = sc.textFile("D:/JavaCode/javascala/src/main/scala/rdd/builder/word") //3. 分词 val word = words.flatMap(_.split(" ")) //4.map映射 val word_map = word.map(x => (x, 1)) //5.聚合 val word_count = word_map.reduceByKey(_ + _) //6.打印结果 word_count.sortBy((x)=>(-x._2)).collect().foreach(println) word_count.groupByKey().foreach(println) //7.关闭spark程序 sc.stop() } }
总结:使用Scala开发spark应用程序,相对Java要更简洁,更方便。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)