9.3.5、Spark Sql

9.3.5、Spark Sql,第1张

9.3.5、Spark Sql 行转列例题(lateral view explode())

(1)统计每个公司每年按月累计收入
—(2)统计每个公司当月比上年同期增长率

—coalesce()函数,可以传入多个 参数,如果第一个参数为null,值为第二数,第二个为null,值为第三个…

object Demo05liti2 {

  def main(args: Array[String]): Unit = {
    
    val spark: SparkSession = SparkSession.builder()
      .appName("Demo05liti2")
      .master("local")
      .config("spark.sql.shuffle.partitions", 3)
      .getOrCreate()

    //导入所有函数和隐式函数依赖
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    val burDf = spark.read
      .format("csv")
      .schema("burk String,year String,tsl01 String,tsl02 String,tsl03 String," +
        "tsl04 String,tsl05 String,tsl06 String,tsl07 String,tsl08 String,tsl09 String," +
        "tsl10 String,tsl11 String,tsl12 String")
      .option("seq", ",")
      .load("sparkproject/data/bulks.txt")

    
    //注册表数据
    burDf.createOrReplaceTempView("burlks")

    //使用sql的方式实现
    spark.sql(
      """
        |select burk
        |       ,year
        |       ,month
        |       ,price
        |       ,sum(price) over(partition by burk,year order by month) as rk
        |from burlks
        |lateral view explode(Map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12))v1 as month,price
        |
      """.stripMargin)
      .show()
    println("-" * 50)

    //使用DSL的方式实现
    //不能直接传入map,需要使用expr包一下
    val mapl = map(expr("1"), $"tsl01"
      , expr("2"), $"tsl02"
      , expr("3"), $"tsl03"
      , expr("4"), $"tsl04"
      , expr("5"), $"tsl05"
      , expr("6"), $"tsl06"
      , expr("7"), $"tsl07"
      , expr("8"), $"tsl08"
      , expr("9"), $"tsl09"
      , expr("10"), $"tsl10"
      , expr("11"), $"tsl11"
      , expr("12"), $"tsl12"
    )
    
    burDf
      .select($"burk", $"year", explode(mapl) as Array("month", "price"))
      .select($"burk", $"year", $"month", $"price", sum($"price") over Window.partitionBy($"burk", $"year").orderBy($"month", $"month") as "sum_price")
    println("-" * 60)

    
    //使用DSL方式实现
    burDf
      .select($"burk", $"year", explode(mapl) as Array("month", "price"))
      //使用lag函数往前看1个数据,以月份进行分区,相同月份在一起往前看1
      .select($"burk", $"year", $"month", $"price", lag($"price", 1) over Window.partitionBy($"burk", $"month").orderBy($"year") as "last_price")
      .select($"burk", $"year", $"month", round(coalesce($"price" / $"last_price" - 1,expr("1")),5) as "incrprer")
      .show(100)
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5606574.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存