spark处理数据落地Hudi同步HIVE(01)-分区详解

spark处理数据落地Hudi同步HIVE(01)-分区详解,第1张

spark处理数据落地Hudi同步HIVE(01)-分区详解

有的时候我们期望数据是分区的,关于hive的分区详细介绍请连接: hive详细笔记(四)-Hive内部表,外部表,分区表,分桶表详解(附带讲解视频)_JAVA_JAVA-CSDN博客

1 落地Hudi同步HIVE表-没有分区
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object OpHive_NoPartition {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.functions._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType()
      .add("id", DataTypes.StringType)
      .add("name", DataTypes.StringType)
      .add("age", DataTypes.IntegerType)
      .add("city", DataTypes.StringType)
      .add("score", DataTypes.DoubleType)
      .add("gender" , DataTypes.StringType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt")
      .withColumn("ts", lit(commitTime))
    df.show
   // 注册hive驱动
 Class.forName("org.apache.hive.jdbc.HiveDriver")
   // 将数据写入到HUDI  同时同步到hive中
    df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
     // .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段
      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .option(TABLE_NAME, "tb_user_d")  //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true)  // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_d") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
     // .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段  指定多个分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").
      mode(Append).
      save("/hudi/tb_hudi_user_d");
  }

}
2 落地Hudi同步HIVE表-单字段分区
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}


object OpHive_PartitionByString {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    import org.apache.hudi.QuickstartUtils._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.config.HoodieWriteConfig._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user.txt")
      .withColumn("ts", lit(commitTime))
   // 注册hive驱动
    Class.forName("org.apache.hive.jdbc.HiveDriver")
   // 将数据写入到HUDI  同时同步到hive中
    df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city")//hudi分区列
      .option(TABLE_NAME, "tb_user_a")  //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true)  // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_a") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
      .option("hoodie.datasource.hive_sync.partition_fields", "city") // hive分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
      mode(Append).
      save("/hudi/tb_hudi_user_a");
  }

}
3 落地Hudi同步HIVE表-多字段分区
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object OpHive_PartitionByMutiable {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.functions._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType()
      .add("id", DataTypes.StringType)
      .add("name", DataTypes.StringType)
      .add("age", DataTypes.IntegerType)
      .add("city", DataTypes.StringType)
      .add("score", DataTypes.DoubleType)
      .add("gender" , DataTypes.StringType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt")
      .withColumn("ts", lit(commitTime))

    df.show

   // 注册hive驱动
 Class.forName("org.apache.hive.jdbc.HiveDriver")
   // 将数据写入到HUDI  同时同步到hive中
    df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段
      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.ComplexKeyGenerator")
      .option(TABLE_NAME, "tb_user_c")  //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true)  // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_c") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
      .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段  指定多个分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
      mode(Append).
      save("/hudi/tb_hudi_user_c");
  }

}
4 落地Hudi同步HIVE表-日期格式分区
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}


object OpHive_PartitionByDate {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.functions._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType)
      .add("date", DataTypes.StringType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user2.txt")
      .withColumn("ts", lit(commitTime))
    df.show()
    // 处理日期字段   hudi的分区字段要求的数据格式是  yyyy/mm/dd
    // 可以做自定义类实现接收各种类型的时间格式数据进行分区
    val rdd: RDD[Row] = df.rdd.map(row => {
      val id: String = row.getAs[String]("id")
      val name: String = row.getAs[String]("name")
      val age: Int = row.getAs[Int]("age")
      val city: String = row.getAs[String]("city")
      val score: Double = row.getAs[Double]("score")
      val date: String = row.getAs[String]("date").replaceAll("-", "/")
      val ts: String = row.getAs[String]("ts")
      val resRow: Row = Row(id, name, age, city, score, date, ts)
      resRow
    })

    val schema2 = new StructType()
      .add("id", DataTypes.StringType)
      .add("name", DataTypes.StringType)
      .add("age", DataTypes.IntegerType)
      .add("city", DataTypes.StringType)
      .add("score", DataTypes.DoubleType)
      .add("date", DataTypes.StringType)
      .add("ts", DataTypes.StringType)

    val df2 = sparkSession.createDataframe(rdd, schema2)
    df2.show()

   // 注册hive驱动
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    // 将数据写入到HUDI  同时同步到hive中
    df2.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      //DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()配置为org.apache.hudi.keygen.SimpleKeyGenerator,或者不配置该选项,默认为org.apache.hudi.keygen.SimpleKeyGenerator;
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") //hudi分区列
      .option(TABLE_NAME, "tb_user_b") //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_b") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
      .option("hoodie.datasource.hive_sync.partition_fields", "date") // hive分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      //$$$$$$$$$$$$$$$$$$$$$$$$注意这里的抽取类
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor").
     // new  SlashEncodedDayPartitionValueExtractor  这个类专门处理时间分区的内容  我们可以自定义自己的分区
      mode(Append).
      save("/hudi/tb_hudi_user_b")
  }

}
5 落地Hudi同步HIVE表-HUDI目录使用HIVE格式分区
package com.doitedu.demo

import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object OpHive_Partition_HIveStyle {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root");
    val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("datanucleus.schema.autoCreateTables", "true")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  
  def insertData(sparkSession: SparkSession) = {
    import org.apache.hudi.QuickstartUtils._
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.spark.sql.SaveMode._
    import org.apache.spark.sql.functions._

    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val schema = new StructType()
      .add("id", DataTypes.StringType)
      .add("name", DataTypes.StringType)
      .add("age", DataTypes.IntegerType)
      .add("city", DataTypes.StringType)
      .add("score", DataTypes.DoubleType)
      .add("gender" , DataTypes.StringType)
    // 加载数据同时添加一个 时间字段
    val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt")
      .withColumn("ts", lit(commitTime))
    df.show
   // 注册hive驱动
 Class.forName("org.apache.hive.jdbc.HiveDriver")
   // 将数据写入到HUDI  同时同步到hive中
    df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs()).
      // 设置表类型  COW   概念在后面会涉及到
      option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段
      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.ComplexKeyGenerator")
      //--------------指定分区格式为hive的格式
      .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,"true")
      .option(TABLE_NAME, "tb_user_e")  //hudi中的表
      .option("hoodie.datasource.hive_sync.enable", true)  // 同步数据到hive
      .option("hoodie.datasource.hive_sync.table", "tb_user_e") //hive中的表
      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000")
     .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段  指定多个分区字段
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.embed.timeline.server", false)
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
      mode(Append).
      save("/hudi/tb_hudi_user_e");
  }

}

hudi目录中分区格式和hive表中的格式一致

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4685845.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存