Cloudera系列(4)RDD Overview

Cloudera系列(4)RDD Overview,第1张

Cloudera系列(4)RDD Overview 一、RDD Overview 1、d性分布式数据集(RDDs)
  • RDDs是Spark核心的一部分
  • d性分布式数据集Resilient Distributed Dataset(RDD)
    • d性(Resilient):如果内存中的数据丢失,可以重新创建
    • 分布式(Distributed):跨集群处理
    • 数据集(Dataset):初始数据可以来自一个源,比如文件,也可以是以编程方式创建
  • 尽管名称如此,RDDs并不是Spark SQL的数据集对象
    • RDDs先于Spark SQL和Dataframe/Dataset API
2、比较RDDs与Dataframes和dataset
  • RDDs是结构化的
    • 没有定义列和行的模式(schema)
    • 不是类似于表;无法使用类似sql的转换进行查询,例如where and select
    • RDD转换使用lambda函数
  • RDDs可以包含任何类型的对象
    • Dataframes仅限于Row对象
    • Datasets仅限于Row对象、case类对象(products)和原始类型
  • 所有Spark语言(Python, Scala, Java)都使用RDDs
    • Scala和Java中的强类型,比如Datasets
  • RDDs不是由Catalyst优化器优化的
    • 手工编码的RDDs通常比Dataframes效率低
  • 可以使用RDDs来创建Dataframes和Datasets
    • RDDs通常用于将非结构化或半结构化数据转换为结构化形式
    • 也可以直接使用Dataframes和Datasets下的RDDs
二、RDD Data Sources 1、RDD Data Types
  • RDDs可以保存任何可序列化类型的元素
    • 基本类型,如整数、字符和布尔值
    • 集合,如字符串、列表、数组、元组和字典(包括嵌套集合类型)
    • Scala/Java对象(如果可序列化)
    • 混合类型
  • 有些RDDs是特定的,具有额外的功能
    • Pair RDDs
      • 由键值对key-value组成的RDDs
    • Double RDDs
      • 由数字数据组成的RDDs
2、RDD Data Sources
  • RDDs有几种类型的数据源
    • Files,包括文本文件和其他格式
    • 内存中的数据
    • Other RDDs
    • Datasets or Dataframes
3、Creating RDDs from Files
  • 使用SparkContext对象创建,而不是SparkSession创建
    • SparkContext是Spark核心库的一部分
    • SparkSession是Spark SQL库的一部分
    • 每个Spark上下文应用程序
    • 使用SparkSession.sparkContext访问Spark上下文
      • 在Spark shell中简称为sc
  • 使用Spark上下文创建基于文件的RDDs
    • 使用textFile或whole etextfiles读取文本文件
    • 使用hadoopFile或newAPIHadoopFile读取其他格式
      • 在Hadoop.20中引入了Hadoop“new API”
      • Spark同时支持向后兼容
三、Creating and Saving RDDs 1、从文本文件(Text Files)创建RDDs
  • SparkContext.textFile读取以换行符结束的文本文件
    • 接受单个文件、文件目录、文件通配符列表或逗号分隔的文件列表
    • 示例:
      • textFile(“myfile.txt”)
      • textFile(“mydata/”)
      • textFile(“mydata/*.log”)
      • textFile(“myfile1.txt,myfile2.txt”)

python:

myRDD = spark.sparkContext.textFile("mydata/")
  • textFile将文件中的每一行映射到一个单独的RDD元素
    • 只支持换行结束的文本
2、多行文本元素
  • textFile将文件中的每一行映射到一个单独的RDD元素
    • 对于XML或JSON等多行输入格式的文件呢?
  • 使用wholeTextFiles
    • 将目录中每个文件的全部内容映射到单个RDD元素
    • 仅适用于小文件(元素必须适合内存)

user1.json

{
"firstName":"Fred",
"lastName":"Flintstone",
"userid":"123"
}

user2.json

{
"firstName":"Barney",
"lastName":"Rubble",
"userid":"234"
}
3、示例:使用wholeTextFiles
userRDD = spark.sparkContext.wholeTextFiles("userFiles")

4、从集合(Collections)中创建RDDs
  • 可以从集合而不是文件中创建RDDs
    • SparkContext.parallelize(collection)
  • 经常用于
    • 测试
    • 以编程方式生成数据
    • 与其他系统或库集成
    • 学习

python:

myData = ["Alice","Carlos","Frank","Barbara"]

myRDD = sc.parallelize(myData)
5、Saving RDDs
  • 可以保存RDDs到相同的数据源类型以支持读取RDDs
    • 使用RDD.saveAsTextFile保存为指定目录下的纯文本文件
    • 使用RDD.saveAsHadoopFile或者saveAsNewAPIHadoopFile带有指定的Hadoop OutputFormat以使用其他格式保存
  • 指定的保存目录不存在
myRDD.saveAsTextFile("mydata/")
四、RDD *** 作(Operations) 1、RDD Operations
  • 两种常见的RDD *** 作
    • Actions:将一个值返回给Spark驱动程序或将数据保存到数据源
    • Transformations:在当前RDD的基础上定义一个新的RDD
  • RDDs *** 作是惰性执行的
    • Actions触发基本RDD transformations的执行
2、RDD Action Operations
  • 一些公共的actions
    • count:返回元素的个数
    • first:返回第一个元素
    • take(n):返回前n个元素的数组(Scala)或列表(Python)
    • collect:返回所有元素的数组(Scala)或列表(Python)
    • saveAsTextFile(dir):保存为文本文件

python:

myRDD = sc.textFile("purplecow.txt")

for line in myRDD.take(2): 
 print(line)
I've never seen a purple cow.
I never hope to see one;

scala:

val myRDD = sc.textFile("purplecow.txt")

for (line <- myRDD.take(2))
 println(line) 
I've never seen a purple cow.
I never hope to see one;
3、RDD Transformation Operations
  • Transformations从现有的RDD创建一个新的RDD
  • RDDs是不可变的
    • RDD中的数据永远不变
    • 转换数据从而创建新的RDD
  • 一个转换 *** 作(transformation operation)执行一个转换函数(transformation function)
    • 该功能将RDD中的元素转换为新的元素
    • 有些转换实现自己的转换逻辑
    • 对于许多项目来说,必须提供执行转换的功能
  • Transformation operations包括:
    • distinct:创建一个新的RDD,删除基本RDD中的重复元素
    • union(rdd):通过将一个rdd中的数据附加到另一个rdd中来创建一个新的rdd
    • map(function):通过对基本RDD中的每条记录执行函数来创建一个新的RDD
    • filter(function):通过根据布尔函数在基本RDD中包含或排除每一条记录来创建一个新的RDD
4、Example:distinct and union Transformations

cities1.csv

Boston,MA
Palo Alto,CA
Santa Fe,NM
Palo Alto,CA

cities2.csv

Calgary,AB
Chicago,IL
Palo Alto,CA
distinctRDD = sc.textFile("cities1.csv").distinct()

for city in distinctRDD.collect():
 print(city)
Boston,MA
Palo Alto,CA
Santa Fe,NM

unionRDD = sc.textFile("cities2.csv").union(distinctRDD)

for city in unionRDD.collect(): 
 print(city)
Calgary,AB
Chicago,IL
Palo Alto,CA
Boston,MA
Palo Alto,CA
Santa Fe,NM
五、基本要点
  • RDDs(d性分布式数据集)是Spark中的一个关键概念
    • 表示元素的分布式集合
    • 元素可以是任何类型
  • RDDs是由数据源创建的
    • 文本文件和其他数据文件格式
    • 其他rdd中的数据
    • 内存中的数据
    • 数据帧和数据集
  • RDDs包含非结构化数据
    • 没有Dataframes和dataset这样的关联模式
  • RDD Operations
    • Transformations:在现有RDD的基础上创建新的RDD
    • Actions:从RDD返回一个值
六、实践练习:使用RDD 1、查看RDD *** 作的API文档

1、查看RDD类的API文档(它在Python模块pyspark和Scala包org.apache.spark.rdd中)。注意各种可用的 *** 作

2、从文本文件读取和显示数据

2、通过在单独的窗口(不是Spark shell)中查看(不编辑)文件,查看您将要使用的简单文本文件。该文件位于$DEVDATA/ frostroad.txt。
3、在远程桌面的终端窗口中,上传文本文件到HDFS目录/devsh_loudacre。

$ hdfs dfs -put $DEVDATA/frostroad.txt /devsh_loudacre/


4、在Spark shell中,定义一个基于frostroad.txt文本文件的RDD。

pyspark> myRDD = sc.textFile("/devsh_loudacre/frostroad.txt")
scala> val myRDD = sc.textFile("/devsh_loudacre/frostroad.txt")


5、使用命令补全,您可以看到可以在RDD上执行的所有可用转换和 *** 作。myRDD类型。然后按TAB键。
6、Spark尚未读取该文件。在您对RDD执行 *** 作之前,它不会这样做。尝试使用count动作来计算RDD中的元素数量:

pyspark> myRDD.count()
scala> myRDD.count


7、调用collect *** 作将RDD中的所有数据返回给Spark驱动。注意返回值的类型;在Python中是字符串列表,在Scala中是字符串数组。注意:collect返回整个数据集。对于像这样的非常小的rdd,这很方便,但是对于更典型的大数据集,使用collect时要小心。

pyspark> lines = myRDD.collect()
scala> val lines = myRDD.collect


8、通过循环遍历集合来显示收集的数据的内容。

pyspark> for line in lines: print(line)
scala> for (line <- lines) println(line)

3、在RDD中转换数据

9、在这个练习中,您将加载两个包含各种手机制造商名称的文本文件,并将一个附加到另一个。通过在单独的窗口中查看(不编辑)文件,查看您将要使用的两个文本文件。在$DEVDATA目录下的文件是make .txt和make .txt。

10、将两个文本文件上传到HDFS目录“/devsh_loudacre”。

$ hdfs dfs -put $DEVDATA/makes*.txt /devsh_loudacre/

11、在Spark中,根据/devsh_loudacre/ makes1.txt文件创建一个名为makes1RDD的RDD。

pyspark> makes1RDD = sc.textFile("/devsh_loudacre/makes1.txt")
scala> val makes1RDD = sc.textFile("/devsh_loudacre/makes1.txt")

12、使用collect来显示makes1RDD数据的内容,然后循环使用返回的collection。

pyspark> for make in makes1RDD.collect(): print(make)
scala> for (make <- makes1RDD.collect()) println(make)


13、重复上述步骤,根据第二个文件/devsh_loudacre/makes2.txt创建并显示名为makes2RDD的RDD。

pyspark> makes2RDD = sc.textFile("/devsh_loudacre/makes2.txt")
scala> val makes2RDD = sc.textFile("/devsh_loudacre/makes2.txt")

14、通过使用联合转换将第二个RDD附加到第一个RDD,创建一个新的RDD。

pyspark> allMakesRDD = makes1RDD.union(makes2RDD)
scala> val allMakesRDD = makes1RDD.union(makes2RDD)

15、收集并显示新的allMakesRDD RDD的内容。

pyspark> for make in allMakesRDD.collect(): print(make)
scala> for (make <- allMakesRDD.collect()) println(make)

16、使用distinct transformation 从allMakesRDD中删除重复。收集并显示内容,以确认删除了重复的元素。

17、可选:尝试在上面创建的rdd上执行不同的转换,如交集(intersection)、相减(subtract)或zip。详情请参阅RDD API文档。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5117284.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存