- RDDs是Spark核心的一部分
- d性分布式数据集Resilient Distributed Dataset(RDD)
- d性(Resilient):如果内存中的数据丢失,可以重新创建
- 分布式(Distributed):跨集群处理
- 数据集(Dataset):初始数据可以来自一个源,比如文件,也可以是以编程方式创建
- 尽管名称如此,RDDs并不是Spark SQL的数据集对象
- RDDs先于Spark SQL和Dataframe/Dataset API
- RDDs是结构化的
- 没有定义列和行的模式(schema)
- 不是类似于表;无法使用类似sql的转换进行查询,例如where and select
- RDD转换使用lambda函数
- RDDs可以包含任何类型的对象
- Dataframes仅限于Row对象
- Datasets仅限于Row对象、case类对象(products)和原始类型
- 所有Spark语言(Python, Scala, Java)都使用RDDs
- Scala和Java中的强类型,比如Datasets
- RDDs不是由Catalyst优化器优化的
- 手工编码的RDDs通常比Dataframes效率低
- 可以使用RDDs来创建Dataframes和Datasets
- RDDs通常用于将非结构化或半结构化数据转换为结构化形式
- 也可以直接使用Dataframes和Datasets下的RDDs
- RDDs可以保存任何可序列化类型的元素
- 基本类型,如整数、字符和布尔值
- 集合,如字符串、列表、数组、元组和字典(包括嵌套集合类型)
- Scala/Java对象(如果可序列化)
- 混合类型
- 有些RDDs是特定的,具有额外的功能
- Pair RDDs
- 由键值对key-value组成的RDDs
- Double RDDs
- 由数字数据组成的RDDs
- Pair RDDs
- RDDs有几种类型的数据源
- Files,包括文本文件和其他格式
- 内存中的数据
- Other RDDs
- Datasets or Dataframes
- 使用SparkContext对象创建,而不是SparkSession创建
- SparkContext是Spark核心库的一部分
- SparkSession是Spark SQL库的一部分
- 每个Spark上下文应用程序
- 使用SparkSession.sparkContext访问Spark上下文
- 在Spark shell中简称为sc
- 使用Spark上下文创建基于文件的RDDs
- 使用textFile或whole etextfiles读取文本文件
- 使用hadoopFile或newAPIHadoopFile读取其他格式
- 在Hadoop.20中引入了Hadoop“new API”
- Spark同时支持向后兼容
- SparkContext.textFile读取以换行符结束的文本文件
- 接受单个文件、文件目录、文件通配符列表或逗号分隔的文件列表
- 示例:
- textFile(“myfile.txt”)
- textFile(“mydata/”)
- textFile(“mydata/*.log”)
- textFile(“myfile1.txt,myfile2.txt”)
python:
myRDD = spark.sparkContext.textFile("mydata/")
- textFile将文件中的每一行映射到一个单独的RDD元素
- 只支持换行结束的文本
- 只支持换行结束的文本
- textFile将文件中的每一行映射到一个单独的RDD元素
- 对于XML或JSON等多行输入格式的文件呢?
- 使用wholeTextFiles
- 将目录中每个文件的全部内容映射到单个RDD元素
- 仅适用于小文件(元素必须适合内存)
user1.json
{ "firstName":"Fred", "lastName":"Flintstone", "userid":"123" }
user2.json
{ "firstName":"Barney", "lastName":"Rubble", "userid":"234" }3、示例:使用wholeTextFiles
userRDD = spark.sparkContext.wholeTextFiles("userFiles")4、从集合(Collections)中创建RDDs
- 可以从集合而不是文件中创建RDDs
- SparkContext.parallelize(collection)
- 经常用于
- 测试
- 以编程方式生成数据
- 与其他系统或库集成
- 学习
python:
myData = ["Alice","Carlos","Frank","Barbara"] myRDD = sc.parallelize(myData)5、Saving RDDs
- 可以保存RDDs到相同的数据源类型以支持读取RDDs
- 使用RDD.saveAsTextFile保存为指定目录下的纯文本文件
- 使用RDD.saveAsHadoopFile或者saveAsNewAPIHadoopFile带有指定的Hadoop OutputFormat以使用其他格式保存
- 指定的保存目录不存在
myRDD.saveAsTextFile("mydata/")四、RDD *** 作(Operations) 1、RDD Operations
- 两种常见的RDD *** 作
- Actions:将一个值返回给Spark驱动程序或将数据保存到数据源
- Transformations:在当前RDD的基础上定义一个新的RDD
- RDDs *** 作是惰性执行的
- Actions触发基本RDD transformations的执行
- 一些公共的actions
- count:返回元素的个数
- first:返回第一个元素
- take(n):返回前n个元素的数组(Scala)或列表(Python)
- collect:返回所有元素的数组(Scala)或列表(Python)
- saveAsTextFile(dir):保存为文本文件
python:
myRDD = sc.textFile("purplecow.txt") for line in myRDD.take(2): print(line) I've never seen a purple cow. I never hope to see one;
scala:
val myRDD = sc.textFile("purplecow.txt") for (line <- myRDD.take(2)) println(line) I've never seen a purple cow. I never hope to see one;3、RDD Transformation Operations
- Transformations从现有的RDD创建一个新的RDD
- RDDs是不可变的
- RDD中的数据永远不变
- 转换数据从而创建新的RDD
- 一个转换 *** 作(transformation operation)执行一个转换函数(transformation function)
- 该功能将RDD中的元素转换为新的元素
- 有些转换实现自己的转换逻辑
- 对于许多项目来说,必须提供执行转换的功能
- Transformation operations包括:
- distinct:创建一个新的RDD,删除基本RDD中的重复元素
- union(rdd):通过将一个rdd中的数据附加到另一个rdd中来创建一个新的rdd
- map(function):通过对基本RDD中的每条记录执行函数来创建一个新的RDD
- filter(function):通过根据布尔函数在基本RDD中包含或排除每一条记录来创建一个新的RDD
cities1.csv
Boston,MA Palo Alto,CA Santa Fe,NM Palo Alto,CA
cities2.csv
Calgary,AB Chicago,IL Palo Alto,CA
distinctRDD = sc.textFile("cities1.csv").distinct() for city in distinctRDD.collect(): print(city) Boston,MA Palo Alto,CA Santa Fe,NM unionRDD = sc.textFile("cities2.csv").union(distinctRDD) for city in unionRDD.collect(): print(city) Calgary,AB Chicago,IL Palo Alto,CA Boston,MA Palo Alto,CA Santa Fe,NM五、基本要点
- RDDs(d性分布式数据集)是Spark中的一个关键概念
- 表示元素的分布式集合
- 元素可以是任何类型
- RDDs是由数据源创建的
- 文本文件和其他数据文件格式
- 其他rdd中的数据
- 内存中的数据
- 数据帧和数据集
- RDDs包含非结构化数据
- 没有Dataframes和dataset这样的关联模式
- RDD Operations
- Transformations:在现有RDD的基础上创建新的RDD
- Actions:从RDD返回一个值
1、查看RDD类的API文档(它在Python模块pyspark和Scala包org.apache.spark.rdd中)。注意各种可用的 *** 作
2、从文本文件读取和显示数据2、通过在单独的窗口(不是Spark shell)中查看(不编辑)文件,查看您将要使用的简单文本文件。该文件位于$DEVDATA/ frostroad.txt。
3、在远程桌面的终端窗口中,上传文本文件到HDFS目录/devsh_loudacre。
$ hdfs dfs -put $DEVDATA/frostroad.txt /devsh_loudacre/
4、在Spark shell中,定义一个基于frostroad.txt文本文件的RDD。
pyspark> myRDD = sc.textFile("/devsh_loudacre/frostroad.txt")
scala> val myRDD = sc.textFile("/devsh_loudacre/frostroad.txt")
5、使用命令补全,您可以看到可以在RDD上执行的所有可用转换和 *** 作。myRDD类型。然后按TAB键。
6、Spark尚未读取该文件。在您对RDD执行 *** 作之前,它不会这样做。尝试使用count动作来计算RDD中的元素数量:
pyspark> myRDD.count()
scala> myRDD.count
7、调用collect *** 作将RDD中的所有数据返回给Spark驱动。注意返回值的类型;在Python中是字符串列表,在Scala中是字符串数组。注意:collect返回整个数据集。对于像这样的非常小的rdd,这很方便,但是对于更典型的大数据集,使用collect时要小心。
pyspark> lines = myRDD.collect()
scala> val lines = myRDD.collect
8、通过循环遍历集合来显示收集的数据的内容。
pyspark> for line in lines: print(line)
scala> for (line <- lines) println(line)3、在RDD中转换数据
9、在这个练习中,您将加载两个包含各种手机制造商名称的文本文件,并将一个附加到另一个。通过在单独的窗口中查看(不编辑)文件,查看您将要使用的两个文本文件。在$DEVDATA目录下的文件是make .txt和make .txt。
10、将两个文本文件上传到HDFS目录“/devsh_loudacre”。
$ hdfs dfs -put $DEVDATA/makes*.txt /devsh_loudacre/
11、在Spark中,根据/devsh_loudacre/ makes1.txt文件创建一个名为makes1RDD的RDD。
pyspark> makes1RDD = sc.textFile("/devsh_loudacre/makes1.txt")
scala> val makes1RDD = sc.textFile("/devsh_loudacre/makes1.txt")
12、使用collect来显示makes1RDD数据的内容,然后循环使用返回的collection。
pyspark> for make in makes1RDD.collect(): print(make)
scala> for (make <- makes1RDD.collect()) println(make)
13、重复上述步骤,根据第二个文件/devsh_loudacre/makes2.txt创建并显示名为makes2RDD的RDD。
pyspark> makes2RDD = sc.textFile("/devsh_loudacre/makes2.txt")
scala> val makes2RDD = sc.textFile("/devsh_loudacre/makes2.txt")
14、通过使用联合转换将第二个RDD附加到第一个RDD,创建一个新的RDD。
pyspark> allMakesRDD = makes1RDD.union(makes2RDD)
scala> val allMakesRDD = makes1RDD.union(makes2RDD)
15、收集并显示新的allMakesRDD RDD的内容。
pyspark> for make in allMakesRDD.collect(): print(make)
scala> for (make <- allMakesRDD.collect()) println(make)
16、使用distinct transformation 从allMakesRDD中删除重复。收集并显示内容,以确认删除了重复的元素。
17、可选:尝试在上面创建的rdd上执行不同的转换,如交集(intersection)、相减(subtract)或zip。详情请参阅RDD API文档。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)