- Dataframes从数据源读取数据,并将数据写入数据源
- Spark SQL支持广泛的数据源类型和格式
- Text files
- CSV, JSON, plain text
- Binary format files(二进制格式文件)
- Apache Parquet, Apache ORC, Apache Avro data format
- Tables
- Hive metastore, JDBC
- Cloud
- Such as Amazon S3 and Microsoft ADLS
- Text files
- 也可以使用自定义或第三方数据源类型
- Parquet是Dataframe数据的一种非常常见的文件格式
- Parquet的特点
- 优化的二进制存储的结构化数据
- 架构元数据被嵌入到文件中
- 高效的性能和大数据量
- 许多Hadoop生态系统工具支持
- Spark、Hadoop MapReduce、Hive等
- 使用parquet-tools查看Parquet文件模式和数据
(1)使用head显示前几条记录
$ parquet-tools head mydatafile.parquet
(2)使用schema查看模式
$ parquet-tools schema mydatafile.parquet3、从数据源创建Dataframe
- spark.read返回一个DataframeReader对象
- 使用DataframeReader设置来指定如何从数据中加载数据源
- format:表示数据源类型,如csv、json、parquet等(默认为parquet)
- option:指定底层数据源的键/值设置
- schema:指定要使用的模式,而不是从数据源推断一个模式
- 基于数据源创建Dataframe
- 从一个或多个文件中加载数据
- 读取CSV文本文件
- 将文件中的第一行视为标题,而不是数据
python:
myDF = spark.read.format("csv").option("header","true").load("/loudacre/myFile.csv")
- 读取Avro文件
python:
myDF = spark.read.format("avro").load("/loudacre/myData.avro")5、DataframeReader便捷函数
- 可以为某些格式调用特定格式的加载函数
- 代替设置格式和使用加载的快捷方式
- 下面两个代码示例是相同的,常规写法与简洁写法
spark.read.format("csv").load("/loudacre/myFile.csv")
spark.read.csv("/loudacre/myFile.csv")6、指定数据源文件位置
- 从文件数据源读取时,必须指定位置
- 位置可以是单个文件、文件列表、目录或通配符
- 例如
- spark.read.json(“myfile.json”)
- spark.read.json(“mydata/”)
- spark.read.json(“mydata/*.json”)
- spark.read.json(“myfile1.json”,“myfile2.json”)
- 文件和目录由绝对或相对URI引用
- 相对URI(使用默认文件系统)
- myfile.json
- 绝对URI
- hdfs://nnhost/loudacre/myfile.json
- file:/home/training/myfile.json
- 相对URI(使用默认文件系统)
- Apache Hive提供对HDFS中的数据类似数据库的访问
- 将模式(Schemas)应用于HDFS文件
- 元数据存储在Hive metastore中
- Spark可以对Hive tables进行读写 *** 作
- 从Hive metastore推断Dataframe schema
- Spark Hive支持必须开启并配置Hive的位置
- warehouse in HDFS
python:
usersDF = spark.read.table("users")8、从内存中的数据创建Dataframe
- 可以从内存中的数据集合创建Dataframe
- 对测试和集成有用
val mydata = List(("Josiah","Bartlet"),("Harry","Potter")) val myDF = spark.createDataframe(mydata) myDF.show +------+-------+ | _1| _2| +------+-------+ |Josiah|Bartlet| | Harry| Potter| +------+-------+二、将Dataframes保存到Data Sources 1、关键的DataframeWriter Functions
- Dataframe write function返回一个DataframeWriter
- 将数据保存到一个数据源,如表或一组文件
- 工作原理类似于DataframeReader
- DataframeWriter方法
- format指定数据源类型
- mode决定目录或表是否已经存在
- error、overwrite、append或ignore(默认为error)
- partitionBy将数据以表单的形式存储在分区目录中
column=value(和Hive分区一样) - option指定目标数据源的属性
- save将数据以文件的形式保存在指定的目录下
- 或者使用json、csv、parquet等
- saveAsTable将数据保存到Hive metastore表中
- 基于Hive仓库默认的数据位置
- 设置path选项以覆盖位置
- 向一个名为my_table的Hive metastore table写入数据
- 如果表已经存在,则追加数据
- 换个位置
myDF.write.mode("append").option("path","/loudacre/mydata").saveAsTable("my_table")
- 在mydata目录下以Parquet文件的形式写入数据
myDF.write.save("mydata")3、保存数据到文件
- 从Dataframe中保存数据时,必须指定一个目录
- Spark将数据保存到该目录下的一个或多个部件文件中
devDF.write.csv("devices")
$ hdfs dfs -ls devices Found 4 items -rw-r--r-- 3 training training 0 … devices/_SUCCESS -rw-r--r-- 3 training training 2119 … devices/part-00000-e0fa6381-….csv -rw-r--r-- 3 training training 2202 … devices/part-00001-e0fa6381-….csv -rw-r--r-- 3 training training 2333 … devices/part-00002-e0fa6381-….csv三、Dataframe Schemas 1、Dataframe Schemas
- 每个Dataframe都有一个相关的模式(schema)
- 模式(schema)用于定义列的名称和类型
- 模式(schema)是不可变的并在Dataframe创建时定义
myDF.printSchema() root |-- lastName: string (nullable = true) |-- firstName: string (nullable = true) |-- age: integer (nullable = true)
- 当从数据源创建一个新的Dataframe时,模式可以:
- 自动从数据源推断
- 以编程方式指定
- 当Dataframe被转换(transformation)创建时,Spark会基于查询(query)计算新的模式(schema)。
- Spark可以从结构化数据加载模式,例如:
- Parquet、ORC和Avro数据文件 ——schema被嵌入到文件中
- Hive table ——schema在Hive metastore中定义
- Parent Dataframes
- Spark还可以尝试从半结构化数据源推断模式
- 例如JSON和CSV
spark.read.option("inferSchema","true").csv("people.csv").printSchema() root |-- _c0: integer (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: integer (nullable = true)4、示例:推断CSV文件的模式(有表头)
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema() root |-- pcode: integer (nullable = true) |-- lastName: string (nullable = true) |-- firstName: string (nullable = true) |-- age: integer (nullable = true)5、Inferred Schemas(推断模式)与Manual Schemas(手动模式)
- 依赖于Spark的自动推断模式的缺点
- 推断时需要初始文件扫描,这可能需要很长时间
- 推断出来的模式对您的用例可能不正确
- 手动定义模式
- 模式Schemas是一个包含StructField列表的StructType对象
- 每个StructField表示模式中的一列,指定了:
- 列的名称
- 列的数据类型
- 数据是否可以为空(可选——默认为true)
- 如下推断的CSV文件模式是不正确的
- pcode列应该是一个字符串
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema() root |-- pcode: integer (nullable = true) |-- lastName: string (nullable = true) |-- firstName: string (nullable = true) |-- age: integer (nullable = true)7、示例:以编程方式手动定义模式(Python)
from pyspark.sql.types import * columnsList = [ StructField("pcode", StringType()), StructField("lastName", StringType()), StructField("firstName", StringType()), StructField("age", IntegerType())] peopleSchema = StructType(columnsList)8、示例:以编程方式手动定义模式(Scala)
import org.apache.spark.sql.types._ val columnsList = List( StructField("pcode", StringType), StructField("lastName", StringType), StructField("firstName", StringType), StructField("age", IntegerType)) val peopleSchema = StructType(columnsList)9、示例:应用手动模式
spark.read.option("header","true").schema(peopleSchema).csv("people.csv").printSchema() root |-- pcode: string (nullable = true) |-- lastName: string (nullable = true) |-- firstName: string (nullable = true) |-- age: integer (nullable = true)四、Eager and Lazy Execution 1、急于执行和懒惰执行
- 一旦代码中出现语句, *** 作就会立即执行
- 当只在结果被引用时才执行 *** 作时, *** 作是惰性的
- Spark查询执行起来既迟缓又急切
- Dataframe模式是急切确定的
- 数据转换被延迟执行
- 当在一系列转换中调用一个动作时,会触发惰性执行
python:
> usersDF = spark.read.json("users.json")3、Eager and Lazy Execution(2)
python:
> usersDF = spark.read.json("users.json") > nameAgeDF = usersDF.select("name","age")
python:
> usersDF = spark.read.json("users.json") > nameAgeDF = usersDF.select("name","age") > nameAgeDF.show()
python:
> usersDF = spark.read.json("users.json") > nameAgeDF = usersDF.select("name","age") > nameAgeDF.show() +-------+----+ | name| age| +-------+----+ | Alice|null| |Brayden| 30| | Carla| 19| …
- Dataframes可以从如下几种不同类型的数据源加载和保存
- CSV和JSON等半结构化文本文件
- Parquet、Avro和ORC等结构化二进制格式
- Hive和JDBC表
- Dataframes可以从数据源中推断出一个模式,或者手动定义一个模式
- Dataframe schema是在创建时确定的,但是查询是延迟执行的(当一个动作被调用时)
1、本练习使用基于devsh Hive数据库中的accounts表的Dataframe。您可以使用Beeline SQL命令行访问Hive来查看模式
$ beeline -u jdbc:hive2://localhost:10000 -e "DESCRIBE devsh.accounts"
2、如果它还没有运行,请启动Spark shell(您可以选择Scala或Python)。
spark-shell
3、使用Hive表devsh.accounts创建一个新的Dataframe。
pyspark> accountsDF = spark.read.table("devsh.accounts")
scala> val accountsDF = spark.read.table("devsh.accounts")
4、打印模式和Dataframe的前几行,注意模式与Hive表的模式是一致的。
5、使用邮政编码为94913的帐户数据行创建一个新的Dataframe,并将结果保存到HDFS目录/devsh_loudacre/ accounts_zip94913中的CSV文件中。您可以在单个命令中完成此 *** 作,如下所示,也可以使用多个命令。
pyspark> accountsDF.where("zipcode = 94913").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")
scala> accountsDF.where("zipcode = '94913'").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")
6、使用hdfs在单独的终端窗口查看hdfs中的/devsh_loudacre/accounts_zip94913目录和其中一个保存文件中的数据。确认CSV文件包含标题行,并且只包含所选邮政编码的记录。
7、可选:尝试基于上面创建的CSV文件创建一个新的Dataframe。比较原始accountsDF和新的Dataframe的模式。有什么不同吗?再次尝试,这次将inferSchema选项设置为true并再次进行比较。
8、如果您还没有这样做,请查看HDFS文件/devsh_loudacre/devices.json中的数据。
9、基于设备创建一个新的Dataframe。json文件。(该命令在推断模式时可能需要几秒钟时间。)
pyspark> devDF = spark.read.json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.json("/devsh_loudacre/devices.json")
10、查看devDF Dataframe的模式。注意Spark从JSON文件中推断出的列名和类型。特别要注意的是,release_dt列的类型是string,而列中的数据实际上代表一个时间戳。
11、定义正确指定此Dataframe列类型的模式。首先导入包含必要类和类型定义的包。
注:“_”代表全部,与JAVA的“*”一致意思
pyspark> from pyspark.sql.types import *
scala> import org.apache.spark.sql.types._
12、接下来,创建一个StructField对象集合,这些对象表示列定义。release_dt列应该是一个时间戳。
pyspark> devColumns = [StructField("devnum",LongType()), StructField("make",StringType()), StructField("model",StringType()), StructField("release_dt",TimestampType()), StructField("dev_type",StringType())]
scala> val devColumns = List( StructField("devnum",LongType), StructField("make",StringType), StructField("model",StringType), StructField("release_dt",TimestampType), StructField("dev_type",StringType))
13、使用列定义列表创建模式(StructType对象)。
pyspark> devSchema = StructType(devColumns)
scala> val devSchema = StructType(devColumns)
14、重新创建devDF Dataframe,这次使用新的模式。
pyspark> devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")
15、查看新Dataframe的模式和数据,并确认release_dt列类型现在是时间戳。
16、既然设备数据使用了正确的模式,那么就以Parquet格式写入数据,这会自动嵌入模式。将Parquet数据文件保存到HDFS的“/devsh_loudacre/devices_parquet”目录下。
17、可选:在单独的终端窗口中,使用parquet-tools查看已保存文件的模式。请先下载HDFS目录(或单个文件),再执行该命令。
$ hdfs dfs -get /devsh_loudacre/devices_parquet /tmp/
$ parquet-tools schema /tmp/devices_parquet/
注意,release_dt列的类型被标注为int96;这就是Spark在Parquet中表示时间戳类型的方式。有关拼花工具的更多信息,请运行拼花工具——help。
18、使用保存在devices_parquet中的Parquet文件创建一个新的Dataframe并查看它的模式。注意,Spark能够从Parquet的嵌入式模式中正确推断release_dt列的时间戳类型。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)