各算子实例部分算子处理过程的解释(取于大作业答辩ppt的一部分):编写词频统计程序
创建一个测试文件,RddTest,内容如下:
[root@hadoop01 ~]# cd /export/data/ [root@hadoop01 data]# vim RddTest.txt hadoop spark itcast rdd scala spark spark itcast itcast hadoop
启动spark-shell
需要与hdfs交互时,启动spark-shell --master local[2],并–jars添加hive中mysql的驱动
在linux本地读取文件创建RDD
scala> val test=sc.textFile("file:///export/data/RddTest.txt") test: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[1] at textFile at:24 #hdfs中 scala> val testRDD=sc.textFile("/data/words.txt") testRDD: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[3] at textFile at :24
从一个已经存在的集合、数组上,通过SparkContext对象调用parallelize()方法创建RDD
scala> val array=Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val arrRDD=sc.parallelize(array) arrRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at:26
筛选出满足条件的元素,这些元素组成的集合是一个新的RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[6] at textFile at:24 scala> val linesWithSpark=lines.filter(line=>line.contains("spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at :25
将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[9] at textFile at:24 scala> val words=lines.map(line=>line.split(" ")) words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at :25
将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[12] at textFile at:24 scala> val words=lines.flatMap(line=>line.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at :25
通过groupByKey *** 作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[15] at textFile at:24 scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1)) words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at :25 scala> val groupWords=words.groupByKey() groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at :25
通过reduceByKey *** 作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合,这些元素组成的集合是一个新的RDD
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[20] at textFile at:24 scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1)) words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at :25 scala> val reduceWords=words.reduceByKey((a,b)=>a+b) reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at :25
count()主要用于返回数据集中的元素个数
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at:24 scala> arrRdd.count() res0: Long = 5
first()主要用于返回数组的第一个元素
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at:24 scala> arrRdd.first() res1: Int = 1
take()主要用于以数组的形式返回数组集中的前n个元素
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at:24 scala> arrRdd.take(3) res2: Array[Int] = Array(1, 2, 3)
reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at:24 scala> arrRdd.reduce((a,b)=>a+b) res3: Int = 15
collect()主要用于以数组的形式返回数据集中的所有元素
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at:24 scala> arrRdd.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5)
foreach()主要用于将数据集中的每个元素传递到函数func中运行
scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5)) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at部分算子处理过程的解释(取于大作业答辩ppt的一部分)::24 scala> arrRdd.foreach(x => println(x)) 1 2 3 4 5
scala> val lines=sc.textFile("file:///export/data/RddTest.txt") lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[31] at textFile at:24 scala> val words=lines.flatMap(line=>line.split(" ")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at :25 scala> val wordAndOne = words.map(word=>(word,1)) wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at map at :25 scala> val wordCount = wordAndOne.reduceByKey((a,b)=>a+b) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[34] at reduceByKey at :25 scala> wordCount.foreach(println) (spark,3) (scala,1) (hadoop,2) (itcast,3) (rdd,1)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)