Spark DataFrame 添加列总结

Spark DataFrame 添加列总结,第1张

Spark DataFrame 添加列总结

我的原创地址:https://dongkelun.com/2021/05/19/localSparkHiveWithKerberos/

前言

因添加列在平时可能会经常用到,但是长时间不用,可能会忘记应该用哪个函数,这样再重新查找比较耽误时间,于是总结代码进行备忘。主要总结:

  • 根据现有的列添加
  • 添加自增ID
  • 添加一列常量
  • 添加当前时间
  • 转换为timestamp类型
  • 转换为date类型
代码
package com.dkl.blog.spark.df

import java.util.Date

import org.apache.commons.lang.time.DateFormatUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}


object DfAddCols {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("DeltaLakeDemo")
      .master("local")
      .getOrCreate()

    val df = spark.range(0, 5).repartition(2)
      .withColumn("new_col", col("id") + 1) //根据现有的列添加
      .withColumn("uuid", monotonically_increasing_id) //自带函数添加自增ID,分区不连续,分区内连续
      .withColumn("year", lit("2021")) //添加一列常量,主要用lit函数
      .withColumn("time", lit(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))) //添加当前时间
      .withColumn("timestamp", lit("2021-06-16").cast("timestamp")) //转换为timestamp类型
      .withColumn("date", lit("2021-06-16").cast("date")) //转换为date类型

    df.printSchema()

    df.show()

    //用zipWithIndex重建DF,分区连续
    val rows = df.rdd.zipWithIndex.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
    val dfWithPK = spark.createDataframe(rows, StructType(StructField("pk", LongType, false) +: df.schema.fields))

    //用zipWithUniqueId重建DF
    val rows_2 = dfWithPK.rdd.zipWithUniqueId.map { case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq) }
    val dfWithPK_2 = spark.createDataframe(rows_2, StructType(StructField("pk_2", LongType, false) +: dfWithPK.schema.fields))

    dfWithPK_2.show()

    //通过窗口函数排序
    val w = Window.orderBy("id")
    dfWithPK_2.repartition(2).withColumn("pk_3", row_number().over(w)).show()

    spark.stop()
  }
}

运行结果
 |-- id: long (nullable = false)
 |-- new_col: long (nullable = false)
 |-- uuid: long (nullable = false)
 |-- year: string (nullable = false)
 |-- time: string (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)

+---+-------+----------+----+-------------------+-------------------+----------+
| id|new_col|      uuid|year|               time|          timestamp|      date|
+---+-------+----------+----+-------------------+-------------------+----------+
|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
+---+-------+----------+----+-------------------+-------------------+----------+

+----+---+---+-------+----------+----+-------------------+-------------------+----------+
|pk_2| pk| id|new_col|      uuid|year|               time|          timestamp|      date|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+
|   0|  0|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   2|  1|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   4|  2|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   1|  3|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
|   3|  4|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+

21/06/16 11:32:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+
|pk_2| pk| id|new_col|      uuid|year|               time|          timestamp|      date|pk_3|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+
|   0|  0|  0|      1|         0|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   1|
|   1|  3|  1|      2|8589934592|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   2|
|   2|  1|  2|      3|         1|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   3|
|   3|  4|  3|      4|8589934593|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   4|
|   4|  2|  4|      5|         2|2021|2021-06-16 11:32:33|2021-06-16 11:32:33|2021-06-16|   5|
+----+---+---+-------+----------+----+-------------------+-------------------+----------+----+


UDF

也可以使用自定义函数添加新列,具体可以参考Spark UDF使用详解及代码示例,各自的优劣可以自己总结

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653996.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存