Cloudera系列(3)使用DataFrame的Queries分析数据

Cloudera系列(3)使用DataFrame的Queries分析数据,第1张

Cloudera系列(3)使用DataFrame的Queries分析数据 一、使用列表达式查询数据帧(Dataframe) 1、列、列名和列表达式
  • 大多数Dataframe的转换(transformations)需要你指定一个或多个列
    • select(column1, column2, …)
    • orderBy(column1, column2, …)
  • 对于许多简单的查询(queries),只需将列名指定为字符串
    • peopleDF.select(“firstName”,“lastName”)
  • 有些类型的转换(transformations)使用列引用或列表达式而不是列名字符串
2、示例:列引用(Python)
  • 在Python中,有两种相同的方法来引用列
peopleDF = spark.read.option("header","true").csv("people.csv")

peopleDF['age']
Column

peopleDF.age
Column

peopleDF.select(peopleDF.age).show()
+---+
|age|
+---+
| 52|
| 32|
| 28|
3、示例:列引用(Scala)
  • 在Scala中,同样有两种方法来引用列
    • 第一:使用Dataframe的列名
    • 第二:仅使用列名,直到在转换(transformations)中使用时才完全解析
val peopleDF = spark.read.option("header","true").csv("people.csv")

peopleDF("age")
org.apache.spark.sql.Column = age

$"age"
org.apache.spark.sql.ColumnName = age

peopleDF.select(peopleDF("age")).show
+---+
|age|
+---+
| 52|
| 32|
4、列表达式
  • 创建列表达式时,使用列引用而不是简单的字符串
  • 列 *** 作包括:
    • 算术运算符,如+、-、%、/和*
    • 比较和逻辑运算符,如>、<、&&和||
      • “等于” 比较在Scala中是"= = =",在Python中是"=="
    • 字符串函数,如contains, like和substring
    • 数据测试函数,如isNull, isNotNull和NaN(不是一个数字)
    • 排序函数,如asc和desc
      • 仅在sort或者orderBy中使用时有效
  • 有关 *** 作符和函数的完整列表,请参阅API文档中的Column
5、示例:列表达式(Python)
peopleDF.select("lastName", peopleDF.age * 10).show()
+--------+----------+
|lastName|(age * 10)|
+--------+----------+
| Hopper| 520|
| Turing| 320|

peopleDF.where(peopleDF.firstName.startswith("A")).show()
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|94020| Turing| Alan| 32|
|94020|Lovelace| Ada| 28|
+-----+--------+---------+---+
6、示例:列表达式(Scala)
peopleDF.select($"lastName", $"age" * 10).show
+--------+----------+
|lastName|(age * 10)|
+--------+----------+
| Hopper| 520|
| Turing| 320|

peopleDF.where(peopleDF("firstName").startsWith("A")).show
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|94020| Turing| Alan| 32|
|94020|Lovelace| Ada| 28|
+-----+--------+---------+---+
7、列别名(1)
  • 使用列别名函数重命名结果集中的列
    • name是别名(alias)的同义词
  • 示例(Python):针对列名(age * 10)使用新别名age_10
peopleDF.select("lastName",(peopleDF.age * 10).alias("age_10")).show()
+--------+------+
|lastName|age_10|
+--------+------+
| Hopper| 520|
| Turing| 320|
  • 示例(Scala):针对列名(age * 10)使用新别名age_10
peopleDF.select($"lastName",($"age" * 10).alias("age_10")).show
+--------+------+
|lastName|age_10|
+--------+------+
| Hopper| 520|
| Turing| 320|
二、Queries的分组(Group)和聚合(Aggregation) 1、Queries的聚合(Aggregation)
  • 聚合查询对一组值执行计算,并返回单个值
  • 若要对一组分组值执行聚合,需要将groupBy与聚合函数组合使用
  • 例子:每个邮政编码有多少人?
peopleDF.groupBy("pcode").count().show()
+-----+-----+ 
|pcode|count|
+-----+-----+
|94020| 2|
|87501| 1|
|02134| 2|
+-----+-----+
2、转换(Transformation)中的groupBy
  • groupBy接受一个或多个列名或引用
    • 在Scala中,返回一个RelationalGroupedDataset对象
    • 在Python中,返回GroupedData对象
  • 返回的对象提供了聚合函数,包括:
    • count
    • max and min
    • mean (and its alias avg)
    • sum
    • pivot(行转列)
    • agg(使用附加聚合函数进行聚合)
3、附加聚合函数
  • 函数对象提供了几个附加的聚合函数
  • 聚合函数包括:
    • first/last:返回组中的第一个或最后一个项
    • countDistinct:返回组中唯一项(去重后)的数目
    • approx_count_distinct:返回唯一项(去重后)的近似计数
      • Much faster than a full count
    • stddev:计算一组值的标准偏差
    • var_sample/var_pop:计算一组值的方差
    • covar_samp/covar_pop:计算一组值的样本和总体协方差
    • corr:返回一组值的相关性
4、示例:使用函数对象

python:

from pyspark.sql.functions import stddev

peopleDF.groupBy("pcode").agg(stddev("age")).show()
+-----+------------------+ 
|pcode| stddev_samp(age)|
+-----+------------------+
|94020|0.7071067811865476|
|87501| NaN|
|02134|2.1213203435596424|
+-----+------------------+
三、Joining Dataframes 1、Joining Dataframes
  • 使用转换(transformation)中的join来连接两个dataframe
  • Dataframes支持几种类型的连接
    • inner (default)
    • outer
    • left_outer
    • right_outer
    • leftsemi
  • 转换(transformation)中的crossJoin将一个Dataframe的每个元素与另一个Dataframe的每个元素连接起来
2、示例:一个简单的Inner Join


Scala:

val peopleDF = spark.read.option("header","true").csv("people-no-pcode.csv")

val pcodesDF = spark.read.option("header","true").csv("pcodes.csv")

python:

peopleDF.join(pcodesDF, "pcode").show()
+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age| city|state|
+-----+--------+---------+---+---------+-----+
|02134| Hopper| Grace| 52| Boston| MA|
|94020|Lovelace| Ada| 28|Palo Alto| CA|
|87501| Babbage| Charles| 49| Santa Fe| NM|
|02134| Wirth| Niklaus| 48| Boston| MA|
+-----+--------+---------+---+---------+-----+
3、示例:一个Left Outer Join
  • 需要指定连接类型为inner(默认)、outer、left_outer、right_outer或leftsemi

python:

peopleDF.join(pcodesDF, "pcode", "left_outer").show()
+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age| city|state|
+-----+--------+---------+---+---------+-----+
|02134| Hopper| Grace| 52| Boston| MA|
| null| Turing| Alan| 32| null| null|
|94020|Lovelace| Ada| 28|Palo Alto| CA|
|87501| Babbage| Charles| 49| Santa Fe| NM|
|02134| Wirth| Niklaus| 48| Boston| MA|
+-----+--------+---------+---+---------+-----+

Scala:

peopleDF.join(pcodesDF,peopleDF("pcode") === pcodesDF("pcode"),"left_outer").show
+-----+--------+---------+---+---------+-----+
|pcode|lastName|firstName|age| city|state|
+-----+--------+---------+---+---------+-----+
|02134| Hopper| Grace| 52| Boston| MA|
| null| Turing| Alan| 32| null| null|
|94020|Lovelace| Ada| 28|Palo Alto| CA|
|87501| Babbage| Charles| 49| Santa Fe| NM|
|02134| Wirth| Niklaus| 48| Boston| MA|
+-----+--------+---------+---+---------+-----+
4、示例:对名称不同的列进行连接

  • 当连接列的名称不同时,请使用列表达式
    • 结果包括两个连接列

Scala:

peopleDF.join(zcodesDF, $"pcode" === $"zip").show
+-----+--------+---------+---+-----+---------+-----+
|pcode|lastName|firstName|age| zip| city|state|
+-----+--------+---------+---+-----+---------+-----+
|02134| Hopper| Grace| 52|02134| Boston| MA|
|94020|Lovelace| Ada| 28|94020|Palo Alto| CA|
|87501| Babbage| Charles| 49|87501| Santa Fe| NM|
|02134| Wirth| Niklaus| 48|02134| Boston| MA|
+-----+--------+---------+---+-----+---------+-----+

python:

peopleDF.join(zcodesDF,peopleDF.pcode == zcodesDF.zip).show()
+-----+--------+---------+---+-----+---------+-----+
|pcode|lastName|firstName|age| zip| city|state|
+-----+--------+---------+---+-----+---------+-----+
|02134| Hopper| Grace| 52|02134| Boston| MA|
|94020|Lovelace| Ada| 28|94020|Palo Alto| CA|
|87501| Babbage| Charles| 49|87501| Santa Fe| NM|
|02134| Wirth| Niklaus| 48|02134| Boston| MA|
+-----+--------+---------+---+-----+---------+-----+
四、基本要点
  • Dataframe中的列可以通过名称或Column对象指定
    • 可以使用列 *** 作符定义列表达式
  • 使用groupBy和聚合函数计算行组的聚合值
  • 使用join *** 作连接两个dataframe
    • 支持inner, left outer, right outer and semi joins
五、实践练习:使用Dataframe的Queries分析数据 1、使用列表达式查询数据帧

1、可选:查看Column类(在Python模块pyspark中)的API文档。sql和Scala包org.apache.spark.sql)。注意各种可用的选项。
2、在终端中启动Spark shell(如果还没有运行)。
3、基于Hive devsh,新建一个名为accountsDF的Dataframe。accounts表。
4、尝试使用select进行一个简单的查询,使用两种列引用语法。

pyspark> accountsDF.select(accountsDF["first_name"]).show() 
pyspark> accountsDF.select(accountsDF.first_name).show()
scala> accountsDF.select(accountsDF("first_name")).show

scala> accountsDF.select($"first_name").show

5、要研究列表达式,请基于accountsDF Dataframe中的first_name列创建一个要使用的列对象。

pyspark> fnCol = accountsDF.first_name
scala> val fnCol = accountsDF("first_name")

6、注意,对象类型是Column。要查看可用的方法和属性,请使用制表符补全—即输入fnCol。其次是选项卡。
7、当您对现有列执行 *** 作时,将创建New Column对象。在上面创建的fnCol对象上使用相等 *** 作符,根据一个列表达式创建一个新的Column对象,该列表达式标识名为Lucy的用户。

pyspark> lucyCol = (fnCol == "Lucy")
scala> val lucyCol = (fnCol === "Lucy")

8、在选择语句中使用lucyCol列表达式。因为lucyCol是基于布尔表达式的,所以列值将是true或false,具体取决于first_name列的值。确认以true标识名为Lucy的用户。

pyspark> accountsDF.select(accountsDF.first_name,accountsDF.last_name,lucyCol).show()

scala> accountsDF.select($"first_name",$"last_name",lucyCol).show

9、where *** 作需要一个基于布尔的列表达式。在where转换中使用lucyCol列表达式,并在生成的Dataframe中查看数据。确认数据中只有名为Lucy的用户。

> accountsDF.where(lucyCol).show(5)

10、列表达式不需要赋值给变量。尝试不使用lucyCol变量的相同查询。

pyspark> accountsDF.where(fnCol == "Lucy").show(5)
scala> accountsDF.where(fnCol === "Lucy").show(5)

11、列表达式不限于上面的 *** 作。它们可以用于任何可以使用简单列的转换,例如选择。尝试选择城市和州列,以及phone_number列的前三个字符(在美国,电话号码的前三个数字被称为地区代码)。在phone_number列上使用substr *** 作符提取区域代码。

pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3)).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3)).show(5)

12、注意,在最后一步中,查询返回的值是正确的,但是列名是子字符串(phone_number, 1, 3),它很长,很难处理。重复相同的查询,使用别名 *** 作符将该列重命名为area_code。

pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3).alias("area_code")).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3).alias("area_code")).show(5)

13、执行一个查询,结果是只包含first_name和last_name列的Dataframe,并且只包含名和姓都以相同的两个字母开头的用户。(例如,用户Roberta Roget将被包括在内,因为她的姓和名都以“Ro”开头。)

2、按名称分组和计数

14、使用带有count的groupBy查询accountsDF Dataframe,以找出共享每个姓氏的总人数。(注意计数聚合转换返回一个Dataframe,不像计数Dataframe *** 作,它返回一个单一的值给驱动程序。)

pyspark> accountsDF.groupBy("last_name").count().show(5)
scala> accountsDF.groupBy("last_name").count.show(5)


15、还可以按多列分组。再次查询accountsDF,这一次计算拥有相同姓和名的人数。

pyspark> accountsDF.groupBy("last_name","first_name").count().show(5)

scala> accountsDF.groupBy("last_name","first_name").count.show(5)
3、通过邮政编码将帐户数据与蜂窝塔连接

16、在本节中,您将将一直使用的帐户数据与位于base_stations中的蜂窝基站位置数据连接起来。镶木地板文件。首先检查模式和数据的一些记录。在单独的终端窗口(不是运行Spark shell的终端窗口)中使用parquet-tools命令。

$ parquet-tools schema $DEVDATA/base_stations.parquet 

$ parquet-tools head $DEVDATA/base_stations.parquet

17、上传数据文件到HDFS。

$ hdfs dfs -put $DEVDATA/base_stations.parquet /devsh_loudacre/

18、在Spark shell中,使用基站数据创建一个名为baseDF的新Dataframe。检查baseDF模式和数据,确保它与Parquet文件中的数据匹配。

scala> val baseDF = spark.read.parquet("/devsh_loudacre/base_stations.parquet")

19、一些账户持有人居住在有基站的邮政编码地区。加入baseDF和accountsDF以找到这些用户。对于每个用户,包括他们的帐户ID、名、姓,以及他们的邮政编码中基站的ID和位置数据(纬度和经度)。

pyspark> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF, baseDF.zip == accountsDF.zipcode).show()
scala> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF,$"zip" === $"zipcode").show()

4、计数活动设备

20、accountdevice CSV数据集包含所有帐户使用的所有设备的列表。数据集中的每一行包括行ID、账户ID、设备类型的ID、设备被激活的日期和具体的设备ID。CSV数据文件在$DEVDATA/accountdevice目录下。查看数据集中的数据,然后将该目录及其内容上传到HDFS目录/ devsh_loudacre/accountdevice。
21、基于accountdevice数据文件创建Dataframe。
22、使用帐户设备数据和之前在本练习中创建的Dataframes来查找所有活动帐户(即尚未关闭的帐户)中每个设备模型的总数。新的Dataframe应该从最常用的模型到最不常用的模型进行排序。将数据保存为Parquet文件,保存在/devsh_loudacre/top_devices目录下,列如下:


提示:

  • 活动账户是账户表中acct_close_dt(账户关闭日期)为空值的账户。
  • 设备账户数据中的account_id列对应于accounts表中的acct_num列。
  • 设备账户数据中的device_id列对应/devsh_loudacre/devices中已知设备列表中的devnum列。json文件。
  • 当你计数设备时,使用withcolumnrename将计数列重命名为active_num。(count列名有二义性,因为它既是函数又是列。)
  • 完成这个练习的查询有点复杂,包括一系列的转换。您可能希望将变量分配给由构成查询的转换产生的中间dataframe,以使代码更容易使用和调试。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4744757.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-08
下一篇 2022-11-08

发表评论

登录后才能评论

评论列表(0条)

保存