Iceberg(二)对接Spark

Iceberg(二)对接Spark,第1张

Iceberg(二)对接Spark 1、配置参数和jar包

        1、将构建好的Iceberg的spark模块jar包,复制到spark jars下

cp /opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs
  def readTale(sparkSession: SparkSession) = {
    //三种方式
    sparkSession.table("hadoop_prod.db.testA").show()
    sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show()
    sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件
  }
3.2、Spark DF读取Iceberg快照表的两种方式
def readSnapShots(sparkSession: SparkSession) = {
  //根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id
  //根据时间戳读取,必须是时间戳 不能使用格式化后的时间
  sparkSession.read
    .option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照
    .format("iceberg")
    .load("hadoop_prod.db.testA").show()

  //根据快照 id 查询
  sparkSession.read
    .option("snapshot-id", "9054909815461789342")
    .format("iceberg")
    .load("hadoop_prod.db.testA").show()
}
3.3、写入数据并自动创建表
def writeAndCreateTable(sparkSession: SparkSession) = {
  import sparkSession.implicits._
  import org.apache.spark.sql.functions._
  val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"),
    Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29")))
  data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt为分区列
    .create()
}

case class Student(id: Int, name: String, age: Int, dt: String)
3.4、写数据 3.4.1、Append
def AppendTable(sparkSession: SparkSession) = {
  //两种方式
  import sparkSession.implicits._
  val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06-29"), Student(1004, "赵六", 10, "2021-06-30")))
  data.writeTo("hadoop_prod.db.test1").append()  // 使用DataframeWriterV2 API  spark3.0
  data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1")	//使用DataframeWriterV1 API spark2.4
}
3.4.2、Overwrite

        1、动态覆盖,只会刷新所属分区数据

// 动态覆盖,只会刷新所属分区数据
def OverWriteTable(sparkSession: SparkSession) = {
  import sparkSession.implicits._
  val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"),
    Student(1004, "赵六", 10, "2021-06-30")))
  data.writeTo("hadoop_prod.db.test1").overwritePartitions()
}

        2、静态覆盖,手动指定分区

def OverWriteTable2(sparkSession: SparkSession) = {
  import sparkSession.implicits._
  val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111")))
  data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30")
}

        注意:输出表不能存在多个分区。不然会报错

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701770.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存