1、将构建好的Iceberg的spark模块jar包,复制到spark jars下
cp /opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs def readTale(sparkSession: SparkSession) = { //三种方式 sparkSession.table("hadoop_prod.db.testA").show() sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show() sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件 }3.2、Spark DF读取Iceberg快照表的两种方式
def readSnapShots(sparkSession: SparkSession) = { //根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id //根据时间戳读取,必须是时间戳 不能使用格式化后的时间 sparkSession.read .option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照 .format("iceberg") .load("hadoop_prod.db.testA").show() //根据快照 id 查询 sparkSession.read .option("snapshot-id", "9054909815461789342") .format("iceberg") .load("hadoop_prod.db.testA").show() }3.3、写入数据并自动创建表
def writeAndCreateTable(sparkSession: SparkSession) = { import sparkSession.implicits._ import org.apache.spark.sql.functions._ val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"), Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29"))) data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt为分区列 .create() } case class Student(id: Int, name: String, age: Int, dt: String)3.4、写数据 3.4.1、Append
def AppendTable(sparkSession: SparkSession) = { //两种方式 import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06-29"), Student(1004, "赵六", 10, "2021-06-30"))) data.writeTo("hadoop_prod.db.test1").append() // 使用DataframeWriterV2 API spark3.0 data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1") //使用DataframeWriterV1 API spark2.4 }3.4.2、Overwrite
1、动态覆盖,只会刷新所属分区数据
// 动态覆盖,只会刷新所属分区数据 def OverWriteTable(sparkSession: SparkSession) = { import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"), Student(1004, "赵六", 10, "2021-06-30"))) data.writeTo("hadoop_prod.db.test1").overwritePartitions() }
2、静态覆盖,手动指定分区
def OverWriteTable2(sparkSession: SparkSession) = { import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111"))) data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30") }
注意:输出表不能存在多个分区。不然会报错
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)