SparkSQL--DSL风格API(TableApi)语法

SparkSQL--DSL风格API(TableApi)语法,第1张

SparkSQL--DSL风格API(TableApi)语法

DSL(DataSet Language)风格API,就是编程API的方式,来实现SQL语法

DSL:特定领域语言

DataSet的TableApi有一个特点:运算后返回值必回到Dataframe

因为select后,得到的结构,无法判断返回值的具体类型,只能用通用的Row封装

   TableAPI基本 *** 作

数据准备
id,name,age,city,score
1,张三,21,BJ,80.0
2,李四,23,BJ,82.0
3,王五,20,SH,88.6
4,赵六,26,SH,83.0
5,田七,30,SZ,90.0

object TableAPI01 {
  Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    //获取环境
    val spark: SparkSession = SparkSession.builder()
      .appName("TableAPI")
      .master("local[*]")
      .getOrCreate()
    //读取数据 创建df
    val df: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("SQLData/csv/stu.csv")
    //导入spark中的隐式和函数
    import spark.implicits._
    import org.apache.spark.sql.functions._
    println("-----------------------select及表达式----------------------------")
    //使用字符串表达"列"
    df.select("id","name").show()  //无法对值进行二次 *** 作
    //df.select("id + 1","name").show //id+1会被视为一个列名从而出错
    //使用字符串形式表达sql表达式,应该使用selectExpr
    df.selectExpr("id+1","upper(name)").show()
    //使用$符号创建Column对象来表达"列"
    df.select($"id",upper($"name"),$"age"+10).show()
    //使用单边单引号创建Column对象来表达"列"
    df.select('id,upper('name),'age+10).show()
    //使用col函数创建Column对象来表达"列"
    df.select(col("id"),upper(col("name")),col("age")+10).show()
    println("-----------------------起别名----------------------------")
    //使用字符串表达"列"
    //df.select("id","name").show()  //无法对值进行二次 *** 作
    //使用字符串形式表达sql表达式,应该使用selectExpr
    df.selectExpr("id+1 as new_id","upper(name) as new_name").show()
    //使用$符号创建Column对象来表达"列"
    df.select($"id" as "new_id",upper($"name") as "new_name",$"age"+10 as "new_age").show()
    //使用单边单引号创建Column对象来表达"列"
    df.select('id as "new_id",upper('name) as "new_name",'age+10 as "new_age").show()
    //使用col函数创建Column对象来表达"列"
    df.select(col("id") as "new_id"
      ,upper(col("name")) as "new_name"
      ,col("age")+10 as "new_age").show()
    println("-----------------------条件过滤----------------------------")
    df.where("id > 1 and city='BJ'").show()
    df.where('id>1 and 'score >80 ).show()
    println("-----------------------order by----------------------------")
    df.orderBy($"id".desc).show() //id 降序
    df.orderBy("age").show()  //age升序
    println("-----------------------group by 聚合函数----------------------------")
    df.groupBy("city").count().show() //每个city的人数
    df.groupBy("city").avg("score").show //每个city的平均分
    //常用情况  agg(sum() as '别名',...)
    df.groupBy("city").agg(
      avg("score") as "avg_score",
      sum("score") as "sum_score",
      count(lit(1)) as "cnt", //lit(值) 将值转换成常量字段
      collect_list("name") as "names"
    ).show()
    println("-----------------------子查询---------------------------")
    
    df.groupBy("city").agg(sum("score") as "sum_score")
      .where($"sum_score">150).show()
  }
}

窗口函数示例

数据准备

shop1,2022-01-01,500
shop1,2022-01-02,500
shop1,2022-02-01,500
shop1,2022-04-01,500
shop1,2022-03-01,500
shop1,2022-06-01,500
shop2,2022-01-01,500
shop2,2022-05-01,500
shop2,2022-02-01,500
shop2,2022-03-01,500
object TableAPI02 {
  Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    //获取环境
    val spark: SparkSession = SparkSession.builder()
      .appName("TableAPI")
      .master("local[*]")
      .getOrCreate()
    //读取数据 创建df
    //自定义结构
    val schema: StructType = StructType(
      Seq(
        StructField("name", DataTypes.StringType),
        StructField("date", DataTypes.StringType),
        StructField("amount", DataTypes.IntegerType)
      )
    )
    val df: Dataframe = spark.read.schema(schema).csv("SQLData/shop/shop.csv")
    //导入函数和隐式
    import spark.implicits._
    import org.apache.spark.sql.functions._
    //求每个店铺每个月的总金额以及总的累计金额
    //1.求每个店铺每个月的总金额
    val df2: Dataframe = df.groupBy($"name", substring($"date", 0, 7) as "month")
      .agg(sum("amount") as "m_sum_amount")
    //2.求总的累计金额
    df2.select('name,'month,'m_sum_amount,
      sum("m_sum_amount")
        over(Window.partitionBy("name").orderBy("month")) as "total_money"
    ).show()
    df2.select('name,'month,'m_sum_amount,
      sum("m_sum_amount")
        over(Window.partitionBy("name").orderBy("month")
        .rowsBetween(Window.unboundedPreceding,Window.currentRow)) as "total_money" //指定行范围
    ).show()
  }
}

 join关联查询和union

数据准备 user

uid,name,age,gender,city
1,zss,18,M,BJ
2,ls,20,F,BJ
3,wx,30,M,SH

数据准备 order

oid,money,uid,id
1001,100,1,1
1002,100,2,2
1003,100,3,3
1004,100,1,1
1005,100,2,2
1006,100,3,3

 

 

object TableAPI03 {
  Logger.getLogger("org").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    //获取环境
    val spark: SparkSession = SparkSession.builder()
      .appName("TableAPI")
      .master("local[*]")
      .getOrCreate()
    //读取数据 创建df
    val userDF: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/user.csv")
    val orderDF: Dataframe = spark.read.option("header", true).option("inferSchema", true).csv("sql_data/csv/order.csv")
    //导入spark中的隐式和函数
    import org.apache.spark.sql.functions._
    import spark.implicits._
    println("-----------------------join关联查询---------------------------")
    userDF.crossJoin(orderDF).show()  //笛卡尔积
    userDF.join(orderDF).show()    //没有关联条件  也是笛卡尔积
    userDF.join(orderDF,"uid").show()
    userDF.join(orderDF,Seq("uid")).show()
    userDF.join(orderDF,userDF("uid")===orderDF("id")).show() //当两张表的关联条件的字段名不一致时,可以使用这种形式
    //外连接
    userDF.join(orderDF,Seq("uid"),"left").show()
    userDF.join(orderDF,Seq("uid"),"right").show()
    //userDF.join(orderDF,"uid","left")  错误 没有这种构造方法
    userDF.join(orderDF,userDF("uid")===orderDF("id"),"left").show()
    println("-----------------------union---------------------------")
    userDF.union(orderDF).show()   //去重
    userDF.unionAll(orderDF).show() //不去重
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705631.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存