Cloudera系列(2)使用数据帧(DataFrame)和模式(Schemas)

Cloudera系列(2)使用数据帧(DataFrame)和模式(Schemas),第1张

Cloudera系列(2)使用数据帧(DataFrame)和模式(Schemas) 一、从Data Sources创建Dataframes 1、Dataframe的数据源
  • Dataframes从数据源读取数据,并将数据写入数据源
  • Spark SQL支持广泛的数据源类型和格式
    • Text files
      • CSV, JSON, plain text
    • Binary format files(二进制格式文件)
      • Apache Parquet, Apache ORC, Apache Avro data format
    • Tables
      • Hive metastore, JDBC
    • Cloud
      • Such as Amazon S3 and Microsoft ADLS
  • 也可以使用自定义或第三方数据源类型
2、Dataframes and Apache Parquet Files
  • Parquet是Dataframe数据的一种非常常见的文件格式
  • Parquet的特点
    • 优化的二进制存储的结构化数据
    • 架构元数据被嵌入到文件中
    • 高效的性能和大数据量
    • 许多Hadoop生态系统工具支持
      • Spark、Hadoop MapReduce、Hive等
  • 使用parquet-tools查看Parquet文件模式和数据

(1)使用head显示前几条记录

$ parquet-tools head mydatafile.parquet

(2)使用schema查看模式

$ parquet-tools schema mydatafile.parquet
3、从数据源创建Dataframe
  • spark.read返回一个DataframeReader对象
  • 使用DataframeReader设置来指定如何从数据中加载数据源
    • format:表示数据源类型,如csv、json、parquet等(默认为parquet)
    • option:指定底层数据源的键/值设置
    • schema:指定要使用的模式,而不是从数据源推断一个模式
  • 基于数据源创建Dataframe
    • 从一个或多个文件中加载数据
4、示例:从数据源创建Dataframe
  • 读取CSV文本文件
    • 将文件中的第一行视为标题,而不是数据

python:

myDF = spark.read.format("csv").option("header","true").load("/loudacre/myFile.csv")
  • 读取Avro文件

python:

myDF = spark.read.format("avro").load("/loudacre/myData.avro")
5、DataframeReader便捷函数
  • 可以为某些格式调用特定格式的加载函数
    • 代替设置格式和使用加载的快捷方式
  • 下面两个代码示例是相同的,常规写法与简洁写法
spark.read.format("csv").load("/loudacre/myFile.csv")
spark.read.csv("/loudacre/myFile.csv")
6、指定数据源文件位置
  • 从文件数据源读取时,必须指定位置
    • 位置可以是单个文件、文件列表、目录或通配符
    • 例如
      • spark.read.json(“myfile.json”)
      • spark.read.json(“mydata/”)
      • spark.read.json(“mydata/*.json”)
      • spark.read.json(“myfile1.json”,“myfile2.json”)
  • 文件和目录由绝对或相对URI引用
    • 相对URI(使用默认文件系统)
      • myfile.json
    • 绝对URI
      • hdfs://nnhost/loudacre/myfile.json
      • file:/home/training/myfile.json
7、从Hive Tables中创建Dataframes
  • Apache Hive提供对HDFS中的数据类似数据库的访问
    • 将模式(Schemas)应用于HDFS文件
    • 元数据存储在Hive metastore中
  • Spark可以对Hive tables进行读写 *** 作
    • 从Hive metastore推断Dataframe schema
  • Spark Hive支持必须开启并配置Hive的位置
    • warehouse in HDFS

python:

usersDF = spark.read.table("users")
8、从内存中的数据创建Dataframe
  • 可以从内存中的数据集合创建Dataframe
    • 对测试和集成有用
val mydata = List(("Josiah","Bartlet"),("Harry","Potter"))

val myDF = spark.createDataframe(mydata)

myDF.show
+------+-------+
| _1| _2|
+------+-------+
|Josiah|Bartlet|
| Harry| Potter|
+------+-------+
二、将Dataframes保存到Data Sources 1、关键的DataframeWriter Functions
  • Dataframe write function返回一个DataframeWriter
    • 将数据保存到一个数据源,如表或一组文件
    • 工作原理类似于DataframeReader
  • DataframeWriter方法
    • format指定数据源类型
    • mode决定目录或表是否已经存在
      • error、overwrite、append或ignore(默认为error)
    • partitionBy将数据以表单的形式存储在分区目录中
      column=value(和Hive分区一样)
    • option指定目标数据源的属性
    • save将数据以文件的形式保存在指定的目录下
      • 或者使用json、csv、parquet等
    • saveAsTable将数据保存到Hive metastore表中
      • 基于Hive仓库默认的数据位置
      • 设置path选项以覆盖位置
2、示例:将一个Dataframes保存到Data Sources
  • 向一个名为my_table的Hive metastore table写入数据
    • 如果表已经存在,则追加数据
    • 换个位置
myDF.write.mode("append").option("path","/loudacre/mydata").saveAsTable("my_table")
  • 在mydata目录下以Parquet文件的形式写入数据
myDF.write.save("mydata")
3、保存数据到文件
  • 从Dataframe中保存数据时,必须指定一个目录
    • Spark将数据保存到该目录下的一个或多个部件文件中
 devDF.write.csv("devices")
$ hdfs dfs -ls
 devices
Found 4 items
-rw-r--r-- 3 training training 0 … devices/_SUCCESS
-rw-r--r-- 3 training training 2119 … devices/part-00000-e0fa6381-….csv
-rw-r--r-- 3 training training 2202 … devices/part-00001-e0fa6381-….csv
-rw-r--r-- 3 training training 2333 … devices/part-00002-e0fa6381-….csv
三、Dataframe Schemas 1、Dataframe Schemas
  • 每个Dataframe都有一个相关的模式(schema)
    • 模式(schema)用于定义列的名称和类型
    • 模式(schema)是不可变的并在Dataframe创建时定义
myDF.printSchema()
root
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- age: integer (nullable = true)
  • 当从数据源创建一个新的Dataframe时,模式可以:
    • 自动从数据源推断
    • 以编程方式指定
  • 当Dataframe被转换(transformation)创建时,Spark会基于查询(query)计算新的模式(schema)。
2、Inferred Schemas(推断模式)
  • Spark可以从结构化数据加载模式,例如:
    • Parquet、ORC和Avro数据文件 ——schema被嵌入到文件中
    • Hive table ——schema在Hive metastore中定义
    • Parent Dataframes
  • Spark还可以尝试从半结构化数据源推断模式
    • 例如JSON和CSV
3、示例:推断CSV文件的模式(没有表头)

spark.read.option("inferSchema","true").csv("people.csv").printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: integer (nullable = true)
4、示例:推断CSV文件的模式(有表头)

spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()
root
 |-- pcode: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- age: integer (nullable = true)
5、Inferred Schemas(推断模式)与Manual Schemas(手动模式)
  • 依赖于Spark的自动推断模式的缺点
    • 推断时需要初始文件扫描,这可能需要很长时间
    • 推断出来的模式对您的用例可能不正确
  • 手动定义模式
    • 模式Schemas是一个包含StructField列表的StructType对象
    • 每个StructField表示模式中的一列,指定了:
      • 列的名称
      • 列的数据类型
      • 数据是否可以为空(可选——默认为true)
6、示例:错误的模式推断
  • 如下推断的CSV文件模式是不正确的
    • pcode列应该是一个字符串
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()
root
 |-- pcode: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- age: integer (nullable = true)
7、示例:以编程方式手动定义模式(Python)
from pyspark.sql.types import *
columnsList = [
 StructField("pcode", StringType()),
 StructField("lastName", StringType()),
 StructField("firstName", StringType()),
 StructField("age", IntegerType())]
peopleSchema = StructType(columnsList)
8、示例:以编程方式手动定义模式(Scala)
import org.apache.spark.sql.types._
val columnsList = List(
 StructField("pcode", StringType),
 StructField("lastName", StringType),
 StructField("firstName", StringType),
 StructField("age", IntegerType))
val peopleSchema = StructType(columnsList)
9、示例:应用手动模式
spark.read.option("header","true").schema(peopleSchema).csv("people.csv").printSchema()
root
 |-- pcode: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- age: integer (nullable = true)
四、Eager and Lazy Execution 1、急于执行和懒惰执行
  • 一旦代码中出现语句, *** 作就会立即执行
  • 当只在结果被引用时才执行 *** 作时, *** 作是惰性的
  • Spark查询执行起来既迟缓又急切
    • Dataframe模式是急切确定的
    • 数据转换被延迟执行
  • 当在一系列转换中调用一个动作时,会触发惰性执行
2、Eager and Lazy Execution(1)

python:

> usersDF = spark.read.json("users.json")

3、Eager and Lazy Execution(2)

python:

> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")


4、Eager and Lazy Execution(3)

python:

> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")
> nameAgeDF.show()


5、Eager and Lazy Execution(4)

python:

> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")
> nameAgeDF.show()
+-------+----+
| name| age|
+-------+----+
| Alice|null|
|Brayden| 30|
| Carla| 19|
…


五、基本要点
  • Dataframes可以从如下几种不同类型的数据源加载和保存
    • CSV和JSON等半结构化文本文件
    • Parquet、Avro和ORC等结构化二进制格式
    • Hive和JDBC表
  • Dataframes可以从数据源中推断出一个模式,或者手动定义一个模式
  • Dataframe schema是在创建时确定的,但是查询是延迟执行的(当一个动作被调用时)
六、实践练习:使用Dataframes和schema 1、基于Hive Table创建Dataframe

1、本练习使用基于devsh Hive数据库中的accounts表的Dataframe。您可以使用Beeline SQL命令行访问Hive来查看模式

$ beeline -u jdbc:hive2://localhost:10000 -e "DESCRIBE devsh.accounts"

2、如果它还没有运行,请启动Spark shell(您可以选择Scala或Python)。

spark-shell

3、使用Hive表devsh.accounts创建一个新的Dataframe。

pyspark> accountsDF = spark.read.table("devsh.accounts")
scala> val accountsDF = spark.read.table("devsh.accounts")

4、打印模式和Dataframe的前几行,注意模式与Hive表的模式是一致的。
5、使用邮政编码为94913的帐户数据行创建一个新的Dataframe,并将结果保存到HDFS目录/devsh_loudacre/ accounts_zip94913中的CSV文件中。您可以在单个命令中完成此 *** 作,如下所示,也可以使用多个命令。

pyspark> accountsDF.where("zipcode = 94913").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")
scala> accountsDF.where("zipcode = '94913'").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")

6、使用hdfs在单独的终端窗口查看hdfs中的/devsh_loudacre/accounts_zip94913目录和其中一个保存文件中的数据。确认CSV文件包含标题行,并且只包含所选邮政编码的记录。
7、可选:尝试基于上面创建的CSV文件创建一个新的Dataframe。比较原始accountsDF和新的Dataframe的模式。有什么不同吗?再次尝试,这次将inferSchema选项设置为true并再次进行比较。

2、定义Dataframe的架构

8、如果您还没有这样做,请查看HDFS文件/devsh_loudacre/devices.json中的数据。
9、基于设备创建一个新的Dataframe。json文件。(该命令在推断模式时可能需要几秒钟时间。)

pyspark> devDF = spark.read.json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.json("/devsh_loudacre/devices.json")

10、查看devDF Dataframe的模式。注意Spark从JSON文件中推断出的列名和类型。特别要注意的是,release_dt列的类型是string,而列中的数据实际上代表一个时间戳。
11、定义正确指定此Dataframe列类型的模式。首先导入包含必要类和类型定义的包。
注:“_”代表全部,与JAVA的“*”一致意思

pyspark> from pyspark.sql.types import *
scala> import org.apache.spark.sql.types._

12、接下来,创建一个StructField对象集合,这些对象表示列定义。release_dt列应该是一个时间戳。

pyspark> devColumns = [StructField("devnum",LongType()),
StructField("make",StringType()),
StructField("model",StringType()),
StructField("release_dt",TimestampType()),
StructField("dev_type",StringType())]
scala> val devColumns = List(
StructField("devnum",LongType),
StructField("make",StringType),
StructField("model",StringType),
StructField("release_dt",TimestampType),
StructField("dev_type",StringType))

13、使用列定义列表创建模式(StructType对象)。

pyspark> devSchema = StructType(devColumns)
scala> val devSchema = StructType(devColumns)

14、重新创建devDF Dataframe,这次使用新的模式。

pyspark> devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")

15、查看新Dataframe的模式和数据,并确认release_dt列类型现在是时间戳。
16、既然设备数据使用了正确的模式,那么就以Parquet格式写入数据,这会自动嵌入模式。将Parquet数据文件保存到HDFS的“/devsh_loudacre/devices_parquet”目录下。
17、可选:在单独的终端窗口中,使用parquet-tools查看已保存文件的模式。请先下载HDFS目录(或单个文件),再执行该命令。

$ hdfs dfs -get /devsh_loudacre/devices_parquet /tmp/ 
$ parquet-tools schema /tmp/devices_parquet/

注意,release_dt列的类型被标注为int96;这就是Spark在Parquet中表示时间戳类型的方式。有关拼花工具的更多信息,请运行拼花工具——help。
18、使用保存在devices_parquet中的Parquet文件创建一个新的Dataframe并查看它的模式。注意,Spark能够从Parquet的嵌入式模式中正确推断release_dt列的时间戳类型。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4677593.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存