我的原创地址:https://dongkelun.com/2021/05/19/localSparkHiveWithKerberos/
前言因添加列在平时可能会经常用到,但是长时间不用,可能会忘记应该用哪个函数,这样再重新查找比较耽误时间,于是总结代码进行备忘。主要总结:
- 根据现有的列添加
- 添加自增ID
- 添加一列常量
- 添加当前时间
- 转换为timestamp类型
- 转换为date类型
package com.dkl.blog.spark.df import java.util.Date import org.apache.commons.lang.time.DateFormatUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StructField, StructType} object DfAddCols { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("DeltaLakeDemo") .master("local") .getOrCreate() val df = spark.range(0, 5).repartition(2) .withColumn("new_col", col("id") + 1) //根据现有的列添加 .withColumn("uuid", monotonically_increasing_id) //自带函数添加自增ID,分区不连续,分区内连续 .withColumn("year", lit("2021")) //添加一列常量,主要用lit函数 .withColumn("time", lit(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))) //添加当前时间 .withColumn("timestamp", lit("2021-06-16").cast("timestamp")) //转换为timestamp类型 .withColumn("date", lit("2021-06-16").cast("date")) //转换为date类型 df.printSchema() df.show() //用zipWithIndex重建DF,分区连续 val rows = df.rdd.zipWithIndex.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) } val dfWithPK = spark.createDataframe(rows, StructType(StructField("pk", LongType, false) +: df.schema.fields)) //用zipWithUniqueId重建DF val rows_2 = dfWithPK.rdd.zipWithUniqueId.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) } val dfWithPK_2 = spark.createDataframe(rows_2, StructType(StructField("pk_2", LongType, false) +: dfWithPK.schema.fields)) dfWithPK_2.show() //通过窗口函数排序 val w = Window.orderBy("id") dfWithPK_2.repartition(2).withColumn("pk_3", row_number().over(w)).show() spark.stop() } }运行结果
|-- id: long (nullable = false) |-- new_col: long (nullable = false) |-- uuid: long (nullable = false) |-- year: string (nullable = false) |-- time: string (nullable = false) |-- timestamp: timestamp (nullable = true) |-- date: date (nullable = true) +---+-------+----------+----+-------------------+-------------------+----------+ | id|new_col| uuid|year| time| timestamp| date| +---+-------+----------+----+-------------------+-------------------+----------+ | 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| +---+-------+----------+----+-------------------+-------------------+----------+ +----+---+---+-------+----------+----+-------------------+-------------------+----------+ |pk_2| pk| id|new_col| uuid|year| time| timestamp| date| +----+---+---+-------+----------+----+-------------------+-------------------+----------+ | 0| 0| 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 2| 1| 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 4| 2| 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 1| 3| 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| | 3| 4| 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| +----+---+---+-------+----------+----+-------------------+-------------------+----------+ 21/06/16 11:32:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+ |pk_2| pk| id|new_col| uuid|year| time| timestamp| date|pk_3| +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+ | 0| 0| 0| 1| 0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 1| | 1| 3| 1| 2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 2| | 2| 1| 2| 3| 1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 3| | 3| 4| 3| 4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 4| | 4| 2| 4| 5| 2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16| 5| +----+---+---+-------+----------+----+-------------------+-------------------+----------+----+UDF
也可以使用自定义函数添加新列,具体可以参考Spark UDF使用详解及代码示例,各自的优劣可以自己总结
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)