- 大多数Dataframe的转换(transformations)需要你指定一个或多个列
- select(column1, column2, …)
- orderBy(column1, column2, …)
- 对于许多简单的查询(queries),只需将列名指定为字符串
- peopleDF.select(“firstName”,“lastName”)
- 有些类型的转换(transformations)使用列引用或列表达式而不是列名字符串
- 在Python中,有两种相同的方法来引用列
peopleDF = spark.read.option("header","true").csv("people.csv") peopleDF['age'] Column peopleDF.age Column peopleDF.select(peopleDF.age).show() +---+ |age| +---+ | 52| | 32| | 28|3、示例:列引用(Scala)
- 在Scala中,同样有两种方法来引用列
- 第一:使用Dataframe的列名
- 第二:仅使用列名,直到在转换(transformations)中使用时才完全解析
val peopleDF = spark.read.option("header","true").csv("people.csv") peopleDF("age") org.apache.spark.sql.Column = age $"age" org.apache.spark.sql.ColumnName = age peopleDF.select(peopleDF("age")).show +---+ |age| +---+ | 52| | 32|4、列表达式
- 创建列表达式时,使用列引用而不是简单的字符串
- 列 *** 作包括:
- 算术运算符,如+、-、%、/和*
- 比较和逻辑运算符,如>、<、&&和||
- “等于” 比较在Scala中是"= = =",在Python中是"=="
- 字符串函数,如contains, like和substring
- 数据测试函数,如isNull, isNotNull和NaN(不是一个数字)
- 排序函数,如asc和desc
- 仅在sort或者orderBy中使用时有效
- 有关 *** 作符和函数的完整列表,请参阅API文档中的Column
peopleDF.select("lastName", peopleDF.age * 10).show() +--------+----------+ |lastName|(age * 10)| +--------+----------+ | Hopper| 520| | Turing| 320| peopleDF.where(peopleDF.firstName.startswith("A")).show() +-----+--------+---------+---+ |pcode|lastName|firstName|age| +-----+--------+---------+---+ |94020| Turing| Alan| 32| |94020|Lovelace| Ada| 28| +-----+--------+---------+---+6、示例:列表达式(Scala)
peopleDF.select($"lastName", $"age" * 10).show +--------+----------+ |lastName|(age * 10)| +--------+----------+ | Hopper| 520| | Turing| 320| peopleDF.where(peopleDF("firstName").startsWith("A")).show +-----+--------+---------+---+ |pcode|lastName|firstName|age| +-----+--------+---------+---+ |94020| Turing| Alan| 32| |94020|Lovelace| Ada| 28| +-----+--------+---------+---+7、列别名(1)
- 使用列别名函数重命名结果集中的列
- name是别名(alias)的同义词
- 示例(Python):针对列名(age * 10)使用新别名age_10
peopleDF.select("lastName",(peopleDF.age * 10).alias("age_10")).show() +--------+------+ |lastName|age_10| +--------+------+ | Hopper| 520| | Turing| 320|
- 示例(Scala):针对列名(age * 10)使用新别名age_10
peopleDF.select($"lastName",($"age" * 10).alias("age_10")).show +--------+------+ |lastName|age_10| +--------+------+ | Hopper| 520| | Turing| 320|二、Queries的分组(Group)和聚合(Aggregation) 1、Queries的聚合(Aggregation)
- 聚合查询对一组值执行计算,并返回单个值
- 若要对一组分组值执行聚合,需要将groupBy与聚合函数组合使用
- 例子:每个邮政编码有多少人?
peopleDF.groupBy("pcode").count().show() +-----+-----+ |pcode|count| +-----+-----+ |94020| 2| |87501| 1| |02134| 2| +-----+-----+2、转换(Transformation)中的groupBy
- groupBy接受一个或多个列名或引用
- 在Scala中,返回一个RelationalGroupedDataset对象
- 在Python中,返回GroupedData对象
- 返回的对象提供了聚合函数,包括:
- count
- max and min
- mean (and its alias avg)
- sum
- pivot(行转列)
- agg(使用附加聚合函数进行聚合)
- 函数对象提供了几个附加的聚合函数
- 聚合函数包括:
- first/last:返回组中的第一个或最后一个项
- countDistinct:返回组中唯一项(去重后)的数目
- approx_count_distinct:返回唯一项(去重后)的近似计数
- Much faster than a full count
- stddev:计算一组值的标准偏差
- var_sample/var_pop:计算一组值的方差
- covar_samp/covar_pop:计算一组值的样本和总体协方差
- corr:返回一组值的相关性
python:
from pyspark.sql.functions import stddev peopleDF.groupBy("pcode").agg(stddev("age")).show() +-----+------------------+ |pcode| stddev_samp(age)| +-----+------------------+ |94020|0.7071067811865476| |87501| NaN| |02134|2.1213203435596424| +-----+------------------+三、Joining Dataframes 1、Joining Dataframes
- 使用转换(transformation)中的join来连接两个dataframe
- Dataframes支持几种类型的连接
- inner (default)
- outer
- left_outer
- right_outer
- leftsemi
- 转换(transformation)中的crossJoin将一个Dataframe的每个元素与另一个Dataframe的每个元素连接起来
Scala:
val peopleDF = spark.read.option("header","true").csv("people-no-pcode.csv") val pcodesDF = spark.read.option("header","true").csv("pcodes.csv")
python:
peopleDF.join(pcodesDF, "pcode").show() +-----+--------+---------+---+---------+-----+ |pcode|lastName|firstName|age| city|state| +-----+--------+---------+---+---------+-----+ |02134| Hopper| Grace| 52| Boston| MA| |94020|Lovelace| Ada| 28|Palo Alto| CA| |87501| Babbage| Charles| 49| Santa Fe| NM| |02134| Wirth| Niklaus| 48| Boston| MA| +-----+--------+---------+---+---------+-----+3、示例:一个Left Outer Join
- 需要指定连接类型为inner(默认)、outer、left_outer、right_outer或leftsemi
python:
peopleDF.join(pcodesDF, "pcode", "left_outer").show() +-----+--------+---------+---+---------+-----+ |pcode|lastName|firstName|age| city|state| +-----+--------+---------+---+---------+-----+ |02134| Hopper| Grace| 52| Boston| MA| | null| Turing| Alan| 32| null| null| |94020|Lovelace| Ada| 28|Palo Alto| CA| |87501| Babbage| Charles| 49| Santa Fe| NM| |02134| Wirth| Niklaus| 48| Boston| MA| +-----+--------+---------+---+---------+-----+
Scala:
peopleDF.join(pcodesDF,peopleDF("pcode") === pcodesDF("pcode"),"left_outer").show +-----+--------+---------+---+---------+-----+ |pcode|lastName|firstName|age| city|state| +-----+--------+---------+---+---------+-----+ |02134| Hopper| Grace| 52| Boston| MA| | null| Turing| Alan| 32| null| null| |94020|Lovelace| Ada| 28|Palo Alto| CA| |87501| Babbage| Charles| 49| Santa Fe| NM| |02134| Wirth| Niklaus| 48| Boston| MA| +-----+--------+---------+---+---------+-----+4、示例:对名称不同的列进行连接
- 当连接列的名称不同时,请使用列表达式
- 结果包括两个连接列
Scala:
peopleDF.join(zcodesDF, $"pcode" === $"zip").show +-----+--------+---------+---+-----+---------+-----+ |pcode|lastName|firstName|age| zip| city|state| +-----+--------+---------+---+-----+---------+-----+ |02134| Hopper| Grace| 52|02134| Boston| MA| |94020|Lovelace| Ada| 28|94020|Palo Alto| CA| |87501| Babbage| Charles| 49|87501| Santa Fe| NM| |02134| Wirth| Niklaus| 48|02134| Boston| MA| +-----+--------+---------+---+-----+---------+-----+
python:
peopleDF.join(zcodesDF,peopleDF.pcode == zcodesDF.zip).show() +-----+--------+---------+---+-----+---------+-----+ |pcode|lastName|firstName|age| zip| city|state| +-----+--------+---------+---+-----+---------+-----+ |02134| Hopper| Grace| 52|02134| Boston| MA| |94020|Lovelace| Ada| 28|94020|Palo Alto| CA| |87501| Babbage| Charles| 49|87501| Santa Fe| NM| |02134| Wirth| Niklaus| 48|02134| Boston| MA| +-----+--------+---------+---+-----+---------+-----+四、基本要点
- Dataframe中的列可以通过名称或Column对象指定
- 可以使用列 *** 作符定义列表达式
- 使用groupBy和聚合函数计算行组的聚合值
- 使用join *** 作连接两个dataframe
- 支持inner, left outer, right outer and semi joins
1、可选:查看Column类(在Python模块pyspark中)的API文档。sql和Scala包org.apache.spark.sql)。注意各种可用的选项。
2、在终端中启动Spark shell(如果还没有运行)。
3、基于Hive devsh,新建一个名为accountsDF的Dataframe。accounts表。
4、尝试使用select进行一个简单的查询,使用两种列引用语法。
pyspark> accountsDF.select(accountsDF["first_name"]).show()
pyspark> accountsDF.select(accountsDF.first_name).show()
scala> accountsDF.select(accountsDF("first_name")).show
scala> accountsDF.select($"first_name").show
5、要研究列表达式,请基于accountsDF Dataframe中的first_name列创建一个要使用的列对象。
pyspark> fnCol = accountsDF.first_name
scala> val fnCol = accountsDF("first_name")
6、注意,对象类型是Column。要查看可用的方法和属性,请使用制表符补全—即输入fnCol。其次是选项卡。
7、当您对现有列执行 *** 作时,将创建New Column对象。在上面创建的fnCol对象上使用相等 *** 作符,根据一个列表达式创建一个新的Column对象,该列表达式标识名为Lucy的用户。
pyspark> lucyCol = (fnCol == "Lucy")
scala> val lucyCol = (fnCol === "Lucy")
8、在选择语句中使用lucyCol列表达式。因为lucyCol是基于布尔表达式的,所以列值将是true或false,具体取决于first_name列的值。确认以true标识名为Lucy的用户。
pyspark> accountsDF.select(accountsDF.first_name,accountsDF.last_name,lucyCol).show()
scala> accountsDF.select($"first_name",$"last_name",lucyCol).show
9、where *** 作需要一个基于布尔的列表达式。在where转换中使用lucyCol列表达式,并在生成的Dataframe中查看数据。确认数据中只有名为Lucy的用户。
> accountsDF.where(lucyCol).show(5)
10、列表达式不需要赋值给变量。尝试不使用lucyCol变量的相同查询。
pyspark> accountsDF.where(fnCol == "Lucy").show(5)
scala> accountsDF.where(fnCol === "Lucy").show(5)
11、列表达式不限于上面的 *** 作。它们可以用于任何可以使用简单列的转换,例如选择。尝试选择城市和州列,以及phone_number列的前三个字符(在美国,电话号码的前三个数字被称为地区代码)。在phone_number列上使用substr *** 作符提取区域代码。
pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3)).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3)).show(5)
12、注意,在最后一步中,查询返回的值是正确的,但是列名是子字符串(phone_number, 1, 3),它很长,很难处理。重复相同的查询,使用别名 *** 作符将该列重命名为area_code。
pyspark> accountsDF.select("city","state",accountsDF.phone_number.substr(1,3).alias("area_code")).show(5)
scala> accountsDF.select($"city",$"state",$"phone_number".substr(1,3).alias("area_code")).show(5)
13、执行一个查询,结果是只包含first_name和last_name列的Dataframe,并且只包含名和姓都以相同的两个字母开头的用户。(例如,用户Roberta Roget将被包括在内,因为她的姓和名都以“Ro”开头。)
2、按名称分组和计数14、使用带有count的groupBy查询accountsDF Dataframe,以找出共享每个姓氏的总人数。(注意计数聚合转换返回一个Dataframe,不像计数Dataframe *** 作,它返回一个单一的值给驱动程序。)
pyspark> accountsDF.groupBy("last_name").count().show(5)
scala> accountsDF.groupBy("last_name").count.show(5)
15、还可以按多列分组。再次查询accountsDF,这一次计算拥有相同姓和名的人数。
pyspark> accountsDF.groupBy("last_name","first_name").count().show(5)
scala> accountsDF.groupBy("last_name","first_name").count.show(5)3、通过邮政编码将帐户数据与蜂窝塔连接
16、在本节中,您将将一直使用的帐户数据与位于base_stations中的蜂窝基站位置数据连接起来。镶木地板文件。首先检查模式和数据的一些记录。在单独的终端窗口(不是运行Spark shell的终端窗口)中使用parquet-tools命令。
$ parquet-tools schema $DEVDATA/base_stations.parquet
$ parquet-tools head $DEVDATA/base_stations.parquet
17、上传数据文件到HDFS。
$ hdfs dfs -put $DEVDATA/base_stations.parquet /devsh_loudacre/
18、在Spark shell中,使用基站数据创建一个名为baseDF的新Dataframe。检查baseDF模式和数据,确保它与Parquet文件中的数据匹配。
scala> val baseDF = spark.read.parquet("/devsh_loudacre/base_stations.parquet")
19、一些账户持有人居住在有基站的邮政编码地区。加入baseDF和accountsDF以找到这些用户。对于每个用户,包括他们的帐户ID、名、姓,以及他们的邮政编码中基站的ID和位置数据(纬度和经度)。
pyspark> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF, baseDF.zip == accountsDF.zipcode).show()
scala> accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF,$"zip" === $"zipcode").show()4、计数活动设备
20、accountdevice CSV数据集包含所有帐户使用的所有设备的列表。数据集中的每一行包括行ID、账户ID、设备类型的ID、设备被激活的日期和具体的设备ID。CSV数据文件在$DEVDATA/accountdevice目录下。查看数据集中的数据,然后将该目录及其内容上传到HDFS目录/ devsh_loudacre/accountdevice。
21、基于accountdevice数据文件创建Dataframe。
22、使用帐户设备数据和之前在本练习中创建的Dataframes来查找所有活动帐户(即尚未关闭的帐户)中每个设备模型的总数。新的Dataframe应该从最常用的模型到最不常用的模型进行排序。将数据保存为Parquet文件,保存在/devsh_loudacre/top_devices目录下,列如下:
提示:
- 活动账户是账户表中acct_close_dt(账户关闭日期)为空值的账户。
- 设备账户数据中的account_id列对应于accounts表中的acct_num列。
- 设备账户数据中的device_id列对应/devsh_loudacre/devices中已知设备列表中的devnum列。json文件。
- 当你计数设备时,使用withcolumnrename将计数列重命名为active_num。(count列名有二义性,因为它既是函数又是列。)
- 完成这个练习的查询有点复杂,包括一系列的转换。您可能希望将变量分配给由构成查询的转换产生的中间dataframe,以使代码更容易使用和调试。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)