- 1 一般 *** 作:查找和过滤
- 1.1 读取数据源
- 1.1.1读取json
- 1.1.2 读取Hive表
- 1.2 取数据列
- 1.3 过滤算子filter(filter等价于where算子)
- 2 聚合 *** 作:groupBy和agg
- 2.1 排序算子sort(sort等价于orderBy)
- 2.2 分组函数groupBy
- 2.2.1 分组计数
- 2.2.2 分组后求最值、平均值、求和的方法
- 2.2.3 分组后,求多个聚合值(最值、平均值等)。使用算子groupBy+agg
- 2.2.4 分组聚合后取别名
- 2.2.5 分组后行转列,使用pivot
- 2.3 案例
- 3 多表 *** 作Join
- 3.1 数据准备
- 3.2 Join算子说明
- 3.2.1 join内连接
- 3.2.2 其他join类型,只需把inner改成你需要的类型即可
- 4 单表 *** 作:列的增删改与空值处理
1 一般 *** 作:查找和过滤 1.1 读取数据源 1.1.1读取json
使用spark.read。注意:路径默认是从HDFS,如果要读取本机文件,需要加前缀file://,如下
scala> val people = spark.read.format("json").load("file:///opt/software/data/people.json") people: org.apache.spark.sql.Dataframe = [age: bigint, name: string] scala> people.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
spark.read.format(“json”).load(“file:///opt/software/data/people.json”)
等价于spark.read.json(“file:///opt/software/data/people.json”)
如要要读取其它格式文件,只需修改format(“json”)即可,如format(“parquet”)
使用spark.sql。其中hive数据库名default(默认数据库名可省略),表为people
scala> val peopleDF=spark.sql("select * from default.people") peopleDF: org.apache.spark.sql.Dataframe = [name: string, age: int ... 1 more field] scala> peopleDF.show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | wangwu| 33| beijing| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.printSchema root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- address: string (nullable = true)1.2 取数据列
取列的三种方式如下
scala> peopleDF.select("name","age").show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+ scala> peopleDF.select($"name",$"age").show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+ scala> peopleDF.select(peopleDF.col("name"),peopleDF.col("age")).show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+
注意:如果在IDEA中编辑代码,使用
,
则
必
须
增
加
语
句
:
i
m
p
o
r
t
s
p
a
r
k
.
i
m
p
l
i
c
i
t
s
.
,
否
则
,则必须增加语句:import spark.implicits._,否则
,则必须增加语句:importspark.implicits.,否则表达式会报错。spark-shell默认已经导入了的
$”列名”这个是语法糖,返回Column对象
DF.col("id")等价于$"id",取列ColumnName
DF.filter("name=''") 过滤name等于空的行
DF.filter($"age" > 21).show() 过滤age大于21的行,必须增加语句:import spark.implicits._,否则$表达式会报错
DF.filter($"age" === 21) 取等于时必须用===,否则报错,对应的不等于是=!=。等价于DF.filter("age=21")
DF.filter("substring(name,0,1) = 'M'").show 显示name以M开头的行,其中substring是functions.scala,functions.scala包含很多函数方法,等价于DF.filter("substr(name,0,1) = 'M'").show
scala> peopleDF.printSchema root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- address: string (nullable = true) scala> peopleDF.show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | wangwu| 33| beijing| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.filter($"name" === "wangwu").show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+ scala> peopleDF.filter($"name" =!= "wangwu").show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.filter("age > 30").show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+ scala> peopleDF.filter($"age" > 30).show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+2 聚合 *** 作:groupBy和agg 2.1 排序算子sort(sort等价于orderBy)
DF.sort(DF.col(“id”).desc).show 以DF中字段id降序,指定升降序的方法。另外可指定多个字段排序
=DF.sort($“id”.desc).show
DF.sort 等价于DF.orderBy
scala> peopleDF.sort($"age").show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | lisi| 28|shanghai| | wangwu| 33| beijing| +--------+---+--------+ scala> peopleDF.sort($"age".desc).show +--------+---+--------+ | name|age| address| +--------+---+--------+ | wangwu| 33| beijing| | lisi| 28|shanghai| |zhangsan| 22| chengdu| +--------+---+--------+ scala> peopleDF.sort($"age".asc).show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | lisi| 28|shanghai| | wangwu| 33| beijing| +--------+---+--------+ scala> peopleDF.orderBy($"age".asc).show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | lisi| 28|shanghai| | wangwu| 33| beijing| +--------+---+--------+ scala> peopleDF.orderBy($"age".desc).show +--------+---+--------+ | name|age| address| +--------+---+--------+ | wangwu| 33| beijing| | lisi| 28|shanghai| |zhangsan| 22| chengdu| +--------+---+--------+2.2 分组函数groupBy 2.2.1 分组计数
select address,count(1) from people group by address; 等价的算子如下
scala> peopleDF.show() +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | wangwu| 33| beijing| | lisi| 28|shanghai| |xiaoming| 28| beijing| | mm| 21| chengdu| |xiaoming| 18| beijing| | mm| 11| chengdu| +--------+---+--------+ scala> peopleDF.groupBy("address").count().show +--------+-----+ | address|count| +--------+-----+ | beijing| 3| | chengdu| 3| |shanghai| 1| +--------+-----+2.2.2 分组后求最值、平均值、求和的方法
//等价于select address,max(age) from people group by address; scala> peopleDF.groupBy("address").max("age").show +--------+--------+ | address|max(age)| +--------+--------+ | beijing| 33| | chengdu| 22| |shanghai| 28| +--------+--------+ //等价于select address,avg(age) from people group by address; scala> peopleDF.groupBy("address").avg("age").show +--------+------------------+ | address| avg(age)| +--------+------------------+ | beijing|26.333333333333332| | chengdu| 18.0| |shanghai| 28.0| +--------+------------------+ //等价于select address,min(age) from people group by address; scala> peopleDF.groupBy("address").min("age").show +--------+--------+ | address|min(age)| +--------+--------+ | beijing| 18| | chengdu| 11| |shanghai| 28| +--------+--------+ //等价于select address,sum(age) from people group by address; scala> peopleDF.groupBy("address").sum("age").show +--------+--------+ | address|sum(age)| +--------+--------+ | beijing| 79| | chengdu| 54| |shanghai| 28| +--------+--------+2.2.3 分组后,求多个聚合值(最值、平均值等)。使用算子groupBy+agg
//等价于select address,count(age),max(age),min(age),avg(age),sum(age) from people group by address; scala> peopleDF.groupBy("address").agg(count("age"),max("age"),min("age"),avg("age"),sum("age")).show +--------+----------+--------+--------+------------------+--------+ | address|count(age)|max(age)|min(age)| avg(age)|sum(age)| +--------+----------+--------+--------+------------------+--------+ | beijing| 3| 33| 18|26.333333333333332| 79| | chengdu| 3| 22| 11| 18.0| 54| |shanghai| 1| 28| 28| 28.0| 28| +--------+----------+--------+--------+------------------+--------+2.2.4 分组聚合后取别名
scala> peopleDF.groupBy("address").agg(count("age").as("cnt"),avg("age").as("avg")).show +--------+---+------------------+ | address|cnt| avg| +--------+---+------------------+ | beijing| 3|26.333333333333332| | chengdu| 3| 18.0| |shanghai| 1| 28.0| +--------+---+------------------+2.2.5 分组后行转列,使用pivot
//求同名用户在同一个地址的平均年龄 //把name的不同值作为列名 scala> peopleDF.groupBy("address").pivot("name").avg("age").show +--------+----+----+------+--------+--------+ | address|lisi| mm|wangwu|xiaoming|zhangsan| +--------+----+----+------+--------+--------+ | beijing|null|null| 33.0| 23.0| null| | chengdu|null|16.0| null| null| 22.0| |shanghai|28.0|null| null| null| null| +--------+----+----+------+--------+--------+2.3 案例 3 多表 *** 作Join
先构建两个Dataframe
scala> val df1 = spark.createDataset(Seq(("a", 1,2), ("b",2,3) )).toDF("k1","k2","k3") df1: org.apache.spark.sql.Dataframe = [k1: string, k2: int ... 1 more field] scala> val df2 = spark.createDataset(Seq(("a", 2,2), ("b",3,3), ("b", 2,1), ("c", 1,1)) ).toDF("k1","k2","k4") df2: org.apache.spark.sql.Dataframe = [k1: string, k2: int ... 1 more field] scala> df1.show +---+---+---+ | k1| k2| k3| +---+---+---+ | a| 1| 2| | b| 2| 3| +---+---+---+ scala> df2.show +---+---+---+ | k1| k2| k4| +---+---+---+ | a| 2| 2| | b| 3| 3| | b| 2| 1| | c| 1| 1| +---+---+---+3.2 Join算子说明
join比较通用两种调用方式,注意在usingColumns里的字段必须在两个DF中都存在
joinType:默认是 inner. 必须是以下类型的一种:inner, cross, outer, full, full_outer, left, left_outer,right, right_outer, left_semi, left_anti.
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): Dataframe def join(right: Dataset[_], joinExprs: Column, joinType: String): Dataframe3.2.1 join内连接
//select * from df1 join df2 on df1.key1=df2.key1 //方法一 scala> df1.join(df2,"k1").show +---+---+---+---+---+ | k1| k2| k3| k2| k4| +---+---+---+---+---+ | a| 1| 2| 2| 2| | b| 2| 3| 2| 1| | b| 2| 3| 3| 3| +---+---+---+---+---+ //方法二 scala> df1.join(df2,df1("k1") === df2("k1")).show +---+---+---+---+---+---+ | k1| k2| k3| k1| k2| k4| +---+---+---+---+---+---+ | a| 1| 2| a| 2| 2| | b| 2| 3| b| 2| 1| | b| 2| 3| b| 3| 3| +---+---+---+---+---+---+ //方法三 scala> df1.join(df2,df1("k1") === df2("k1"),"inner").show +---+---+---+---+---+---+ | k1| k2| k3| k1| k2| k4| +---+---+---+---+---+---+ | a| 1| 2| a| 2| 2| | b| 2| 3| b| 2| 1| | b| 2| 3| b| 3| 3| +---+---+---+---+---+---+ //不同字段比较 //select * from df1 join df2 on df1.key2=df2.key4 scala> df1.join(df2,df1("k2") === df2("k4"),"inner").show +---+---+---+---+---+---+ | k1| k2| k3| k1| k2| k4| +---+---+---+---+---+---+ | a| 1| 2| c| 1| 1| | a| 1| 2| b| 2| 1| | b| 2| 3| a| 2| 2| +---+---+---+---+---+---+ //多个字段比较 //select * from df1 join df2 on df1.key1=df2.key1 and df1.key2=df2.key2 scala> df1.join(df2,Seq("k1","k2"),"inner").show +---+---+---+---+ | k1| k2| k3| k4| +---+---+---+---+ | b| 2| 3| 1| +---+---+---+---+3.2.2 其他join类型,只需把inner改成你需要的类型即可
scala> df1.join(df2,Seq("k1"),"left").show +---+---+---+---+---+ | k1| k2| k3| k2| k4| +---+---+---+---+---+ | a| 1| 2| 2| 2| | b| 2| 3| 2| 1| | b| 2| 3| 3| 3| +---+---+---+---+---+ //左外连接 left_outer可简写为left scala> df1.join(df2,Seq("k1"),"left_outer").show +---+---+---+---+---+ | k1| k2| k3| k2| k4| +---+---+---+---+---+ | a| 1| 2| 2| 2| | b| 2| 3| 2| 1| | b| 2| 3| 3| 3| +---+---+---+---+---+ //左半连接 scala> df1.join(df2,Seq("k1"),"leftsemi").show +---+---+---+ | k1| k2| k3| +---+---+---+ | a| 1| 2| | b| 2| 3| +---+---+---+ scala> df1.join(df2,Seq("k1","k2"),"left").show +---+---+---+----+ | k1| k2| k3| k4| +---+---+---+----+ | a| 1| 2|null| | b| 2| 3| 1| +---+---+---+----+ scala> df1.join(df2,Seq("k1"),"right").show +---+----+----+---+---+ | k1| k2| k3| k2| k4| +---+----+----+---+---+ | a| 1| 2| 2| 2| | b| 2| 3| 3| 3| | b| 2| 3| 2| 1| | c|null|null| 1| 1| +---+----+----+---+---+
left是left_outer的简写
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)