sparkRDD算子数据处理实践

sparkRDD算子数据处理实践,第1张

sparkRDD算子数据处理实践

文章目录

各算子实例部分算子处理过程的解释(取于大作业答辩ppt的一部分):编写词频统计程序
创建一个测试文件,RddTest,内容如下:

[root@hadoop01 ~]# cd /export/data/
[root@hadoop01 data]# vim RddTest.txt

hadoop spark
itcast rdd
scala spark
spark itcast
itcast hadoop

启动spark-shell

需要与hdfs交互时,启动spark-shell --master local[2],并–jars添加hive中mysql的驱动

各算子实例

在linux本地读取文件创建RDD

scala> val test=sc.textFile("file:///export/data/RddTest.txt")
test: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[1] at textFile at :24

#hdfs中

scala> val testRDD=sc.textFile("/data/words.txt")
testRDD: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[3] at textFile at :24

从一个已经存在的集合、数组上,通过SparkContext对象调用parallelize()方法创建RDD

scala> val array=Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val arrRDD=sc.parallelize(array)
arrRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :26

筛选出满足条件的元素,这些元素组成的集合是一个新的RDD

scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[6] at textFile at :24

scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at :25

将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD

scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[9] at textFile at :24

scala> val words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at :25


将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD

scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[12] at textFile at :24

scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at :25

通过groupByKey *** 作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD

scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[15] at textFile at :24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at :25

scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at :25

通过reduceByKey *** 作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合,这些元素组成的集合是一个新的RDD

scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[20] at textFile at :24

scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at :25

scala> val reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at :25

count()主要用于返回数据集中的元素个数

scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24

scala> arrRdd.count()
res0: Long = 5

first()主要用于返回数组的第一个元素

scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at :24

scala> arrRdd.first()
res1: Int = 1

take()主要用于以数组的形式返回数组集中的前n个元素

scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24

scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)

reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at :24

scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15

collect()主要用于以数组的形式返回数据集中的所有元素

scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at :24

scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)

foreach()主要用于将数据集中的每个元素传递到函数func中运行

scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at :24

scala> arrRdd.foreach(x => println(x))
1
2
3
4
5

部分算子处理过程的解释(取于大作业答辩ppt的一部分):





编写词频统计程序
scala> val lines=sc.textFile("file:///export/data/RddTest.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/RddTest.txt MapPartitionsRDD[31] at textFile at :24

scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at :25

scala> val wordAndOne = words.map(word=>(word,1))
wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[33] at map at :25

scala> val wordCount = wordAndOne.reduceByKey((a,b)=>a+b)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[34] at reduceByKey at :25

scala> wordCount.foreach(println)
(spark,3)
(scala,1)
(hadoop,2)
(itcast,3)
(rdd,1)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5720115.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存