(1)统计每个公司每年按月累计收入
—(2)统计每个公司当月比上年同期增长率
object Demo05liti2 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("Demo05liti2") .master("local") .config("spark.sql.shuffle.partitions", 3) .getOrCreate() //导入所有函数和隐式函数依赖 import spark.implicits._ import org.apache.spark.sql.functions._ val burDf = spark.read .format("csv") .schema("burk String,year String,tsl01 String,tsl02 String,tsl03 String," + "tsl04 String,tsl05 String,tsl06 String,tsl07 String,tsl08 String,tsl09 String," + "tsl10 String,tsl11 String,tsl12 String") .option("seq", ",") .load("sparkproject/data/bulks.txt") //注册表数据 burDf.createOrReplaceTempView("burlks") //使用sql的方式实现 spark.sql( """ |select burk | ,year | ,month | ,price | ,sum(price) over(partition by burk,year order by month) as rk |from burlks |lateral view explode(Map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12))v1 as month,price | """.stripMargin) .show() println("-" * 50) //使用DSL的方式实现 //不能直接传入map,需要使用expr包一下 val mapl = map(expr("1"), $"tsl01" , expr("2"), $"tsl02" , expr("3"), $"tsl03" , expr("4"), $"tsl04" , expr("5"), $"tsl05" , expr("6"), $"tsl06" , expr("7"), $"tsl07" , expr("8"), $"tsl08" , expr("9"), $"tsl09" , expr("10"), $"tsl10" , expr("11"), $"tsl11" , expr("12"), $"tsl12" ) burDf .select($"burk", $"year", explode(mapl) as Array("month", "price")) .select($"burk", $"year", $"month", $"price", sum($"price") over Window.partitionBy($"burk", $"year").orderBy($"month", $"month") as "sum_price") println("-" * 60) //使用DSL方式实现 burDf .select($"burk", $"year", explode(mapl) as Array("month", "price")) //使用lag函数往前看1个数据,以月份进行分区,相同月份在一起往前看1 .select($"burk", $"year", $"month", $"price", lag($"price", 1) over Window.partitionBy($"burk", $"month").orderBy($"year") as "last_price") .select($"burk", $"year", $"month", round(coalesce($"price" / $"last_price" - 1,expr("1")),5) as "incrprer") .show(100) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)