DSL(DataSet Language)风格API,就是编程API的方式,来实现SQL语法
DSL:特定领域语言
DataSet的TableApi有一个特点:运算后返回值必回到Dataframe
因为select后,得到的结构,无法判断返回值的具体类型,只能用通用的Row封装
TableAPI基本 *** 作
数据准备
id,name,age,city,score
1,张三,21,BJ,80.0
2,李四,23,BJ,82.0
3,王五,20,SH,88.6
4,赵六,26,SH,83.0
5,田七,30,SZ,90.0
object TableAPI01 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df val df: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("SQLData/csv/stu.csv") //导入spark中的隐式和函数 import spark.implicits._ import org.apache.spark.sql.functions._ println("-----------------------select及表达式----------------------------") //使用字符串表达"列" df.select("id","name").show() //无法对值进行二次 *** 作 //df.select("id + 1","name").show //id+1会被视为一个列名从而出错 //使用字符串形式表达sql表达式,应该使用selectExpr df.selectExpr("id+1","upper(name)").show() //使用$符号创建Column对象来表达"列" df.select($"id",upper($"name"),$"age"+10).show() //使用单边单引号创建Column对象来表达"列" df.select('id,upper('name),'age+10).show() //使用col函数创建Column对象来表达"列" df.select(col("id"),upper(col("name")),col("age")+10).show() println("-----------------------起别名----------------------------") //使用字符串表达"列" //df.select("id","name").show() //无法对值进行二次 *** 作 //使用字符串形式表达sql表达式,应该使用selectExpr df.selectExpr("id+1 as new_id","upper(name) as new_name").show() //使用$符号创建Column对象来表达"列" df.select($"id" as "new_id",upper($"name") as "new_name",$"age"+10 as "new_age").show() //使用单边单引号创建Column对象来表达"列" df.select('id as "new_id",upper('name) as "new_name",'age+10 as "new_age").show() //使用col函数创建Column对象来表达"列" df.select(col("id") as "new_id" ,upper(col("name")) as "new_name" ,col("age")+10 as "new_age").show() println("-----------------------条件过滤----------------------------") df.where("id > 1 and city='BJ'").show() df.where('id>1 and 'score >80 ).show() println("-----------------------order by----------------------------") df.orderBy($"id".desc).show() //id 降序 df.orderBy("age").show() //age升序 println("-----------------------group by 聚合函数----------------------------") df.groupBy("city").count().show() //每个city的人数 df.groupBy("city").avg("score").show //每个city的平均分 //常用情况 agg(sum() as '别名',...) df.groupBy("city").agg( avg("score") as "avg_score", sum("score") as "sum_score", count(lit(1)) as "cnt", //lit(值) 将值转换成常量字段 collect_list("name") as "names" ).show() println("-----------------------子查询---------------------------") df.groupBy("city").agg(sum("score") as "sum_score") .where($"sum_score">150).show() } }
窗口函数示例
数据准备
shop1,2022-01-01,500 shop1,2022-01-02,500 shop1,2022-02-01,500 shop1,2022-04-01,500 shop1,2022-03-01,500 shop1,2022-06-01,500 shop2,2022-01-01,500 shop2,2022-05-01,500 shop2,2022-02-01,500 shop2,2022-03-01,500
object TableAPI02 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df //自定义结构 val schema: StructType = StructType( Seq( StructField("name", DataTypes.StringType), StructField("date", DataTypes.StringType), StructField("amount", DataTypes.IntegerType) ) ) val df: Dataframe = spark.read.schema(schema).csv("SQLData/shop/shop.csv") //导入函数和隐式 import spark.implicits._ import org.apache.spark.sql.functions._ //求每个店铺每个月的总金额以及总的累计金额 //1.求每个店铺每个月的总金额 val df2: Dataframe = df.groupBy($"name", substring($"date", 0, 7) as "month") .agg(sum("amount") as "m_sum_amount") //2.求总的累计金额 df2.select('name,'month,'m_sum_amount, sum("m_sum_amount") over(Window.partitionBy("name").orderBy("month")) as "total_money" ).show() df2.select('name,'month,'m_sum_amount, sum("m_sum_amount") over(Window.partitionBy("name").orderBy("month") .rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "total_money" //指定行范围 ).show() } }
join关联查询和union
数据准备 user
uid,name,age,gender,city 1,zss,18,M,BJ 2,ls,20,F,BJ 3,wx,30,M,SH
数据准备 order
oid,money,uid,id 1001,100,1,1 1002,100,2,2 1003,100,3,3 1004,100,1,1 1005,100,2,2 1006,100,3,3
object TableAPI03 { Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { //获取环境 val spark: SparkSession = SparkSession.builder() .appName("TableAPI") .master("local[*]") .getOrCreate() //读取数据 创建df val userDF: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/user.csv") val orderDF: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/order.csv") //导入spark中的隐式和函数 import org.apache.spark.sql.functions._ import spark.implicits._ println("-----------------------join关联查询---------------------------") userDF.crossJoin(orderDF).show() //笛卡尔积 userDF.join(orderDF).show() //没有关联条件 也是笛卡尔积 userDF.join(orderDF,"uid").show() userDF.join(orderDF,Seq("uid")).show() userDF.join(orderDF,userDF("uid")===orderDF("id")).show() //当两张表的关联条件的字段名不一致时,可以使用这种形式 //外连接 userDF.join(orderDF,Seq("uid"),"left").show() userDF.join(orderDF,Seq("uid"),"right").show() //userDF.join(orderDF,"uid","left") 错误 没有这种构造方法 userDF.join(orderDF,userDF("uid")===orderDF("id"),"left").show() println("-----------------------union---------------------------") userDF.union(orderDF).show() //去重 userDF.unionAll(orderDF).show() //不去重 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)