有的时候我们期望数据是分区的,关于hive的分区详细介绍请连接: hive详细笔记(四)-Hive内部表,外部表,分区表,分桶表详解(附带讲解视频)_JAVA_JAVA-CSDN博客
1 落地Hudi同步HIVE表-没有分区package com.doitedu.demo import org.apache.hudi.DataSourceWriteOptions import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.{Dataframe, SparkSession} object OpHive_NoPartition { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("datanucleus.schema.autoCreateTables", "true") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() insertData(sparkSession); } def insertData(sparkSession: SparkSession) = { import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.functions._ val commitTime = System.currentTimeMillis().toString //生成提交时间 val schema = new StructType() .add("id", DataTypes.StringType) .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType) .add("score", DataTypes.DoubleType) .add("gender" , DataTypes.StringType) // 加载数据同时添加一个 时间字段 val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt") .withColumn("ts", lit(commitTime)) df.show // 注册hive驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") // 将数据写入到HUDI 同时同步到hive中 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs()). // 设置表类型 COW 概念在后面会涉及到 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间 // .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段 .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.NonpartitionedKeyGenerator") .option(TABLE_NAME, "tb_user_d") //hudi中的表 .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive .option("hoodie.datasource.hive_sync.table", "tb_user_d") //hive中的表 .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000") // .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段 指定多个分区字段 .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.embed.timeline.server", false) .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor"). mode(Append). save("/hudi/tb_hudi_user_d"); } }2 落地Hudi同步HIVE表-单字段分区
package com.doitedu.demo import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession} object OpHive_PartitionByString { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("datanucleus.schema.autoCreateTables", "true") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() insertData(sparkSession); } def insertData(sparkSession: SparkSession) = { import org.apache.spark.sql.functions._ import org.apache.hudi.QuickstartUtils._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.config.HoodieWriteConfig._ val commitTime = System.currentTimeMillis().toString //生成提交时间 val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType) // 加载数据同时添加一个 时间字段 val df: Dataframe = sparkSession.read.schema(schema).csv("/user.txt") .withColumn("ts", lit(commitTime)) // 注册hive驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") // 将数据写入到HUDI 同时同步到hive中 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs()). // 设置表类型 COW 概念在后面会涉及到 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city")//hudi分区列 .option(TABLE_NAME, "tb_user_a") //hudi中的表 .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive .option("hoodie.datasource.hive_sync.table", "tb_user_a") //hive中的表 .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000") .option("hoodie.datasource.hive_sync.partition_fields", "city") // hive分区字段 .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.embed.timeline.server", false) .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). mode(Append). save("/hudi/tb_hudi_user_a"); } }3 落地Hudi同步HIVE表-多字段分区
package com.doitedu.demo import org.apache.hudi.DataSourceWriteOptions import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.{Dataframe, SparkSession} object OpHive_PartitionByMutiable { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("datanucleus.schema.autoCreateTables", "true") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() insertData(sparkSession); } def insertData(sparkSession: SparkSession) = { import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.functions._ val commitTime = System.currentTimeMillis().toString //生成提交时间 val schema = new StructType() .add("id", DataTypes.StringType) .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType) .add("score", DataTypes.DoubleType) .add("gender" , DataTypes.StringType) // 加载数据同时添加一个 时间字段 val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt") .withColumn("ts", lit(commitTime)) df.show // 注册hive驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") // 将数据写入到HUDI 同时同步到hive中 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs()). // 设置表类型 COW 概念在后面会涉及到 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段 .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.ComplexKeyGenerator") .option(TABLE_NAME, "tb_user_c") //hudi中的表 .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive .option("hoodie.datasource.hive_sync.table", "tb_user_c") //hive中的表 .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000") .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段 指定多个分区字段 .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.embed.timeline.server", false) .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). mode(Append). save("/hudi/tb_hudi_user_c"); } }4 落地Hudi同步HIVE表-日期格式分区
package com.doitedu.demo import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.{Dataframe, Row, SparkSession} object OpHive_PartitionByDate { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("datanucleus.schema.autoCreateTables", "true") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() insertData(sparkSession); } def insertData(sparkSession: SparkSession) = { import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.functions._ val commitTime = System.currentTimeMillis().toString //生成提交时间 val schema = new StructType().add("id", DataTypes.StringType).add("name", DataTypes.StringType).add("age", DataTypes.IntegerType).add("city", DataTypes.StringType).add("score", DataTypes.DoubleType) .add("date", DataTypes.StringType) // 加载数据同时添加一个 时间字段 val df: Dataframe = sparkSession.read.schema(schema).csv("/user2.txt") .withColumn("ts", lit(commitTime)) df.show() // 处理日期字段 hudi的分区字段要求的数据格式是 yyyy/mm/dd // 可以做自定义类实现接收各种类型的时间格式数据进行分区 val rdd: RDD[Row] = df.rdd.map(row => { val id: String = row.getAs[String]("id") val name: String = row.getAs[String]("name") val age: Int = row.getAs[Int]("age") val city: String = row.getAs[String]("city") val score: Double = row.getAs[Double]("score") val date: String = row.getAs[String]("date").replaceAll("-", "/") val ts: String = row.getAs[String]("ts") val resRow: Row = Row(id, name, age, city, score, date, ts) resRow }) val schema2 = new StructType() .add("id", DataTypes.StringType) .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType) .add("score", DataTypes.DoubleType) .add("date", DataTypes.StringType) .add("ts", DataTypes.StringType) val df2 = sparkSession.createDataframe(rdd, schema2) df2.show() // 注册hive驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") // 将数据写入到HUDI 同时同步到hive中 df2.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs()). // 设置表类型 COW 概念在后面会涉及到 //DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()配置为org.apache.hudi.keygen.SimpleKeyGenerator,或者不配置该选项,默认为org.apache.hudi.keygen.SimpleKeyGenerator; option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") //hudi分区列 .option(TABLE_NAME, "tb_user_b") //hudi中的表 .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive .option("hoodie.datasource.hive_sync.table", "tb_user_b") //hive中的表 .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000") .option("hoodie.datasource.hive_sync.partition_fields", "date") // hive分区字段 .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.embed.timeline.server", false) //$$$$$$$$$$$$$$$$$$$$$$$$注意这里的抽取类 .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor"). // new SlashEncodedDayPartitionValueExtractor 这个类专门处理时间分区的内容 我们可以自定义自己的分区 mode(Append). save("/hudi/tb_hudi_user_b") } }5 落地Hudi同步HIVE表-HUDI目录使用HIVE格式分区
package com.doitedu.demo import org.apache.hudi.DataSourceWriteOptions import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.{Dataframe, SparkSession} object OpHive_Partition_HIveStyle { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val sparkConf = new SparkConf().setAppName("testhive").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("datanucleus.schema.autoCreateTables", "true") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() insertData(sparkSession); } def insertData(sparkSession: SparkSession) = { import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.functions._ val commitTime = System.currentTimeMillis().toString //生成提交时间 val schema = new StructType() .add("id", DataTypes.StringType) .add("name", DataTypes.StringType) .add("age", DataTypes.IntegerType) .add("city", DataTypes.StringType) .add("score", DataTypes.DoubleType) .add("gender" , DataTypes.StringType) // 加载数据同时添加一个 时间字段 val df: Dataframe = sparkSession.read.schema(schema).csv("/user3.txt") .withColumn("ts", lit(commitTime)) df.show // 注册hive驱动 Class.forName("org.apache.hive.jdbc.HiveDriver") // 将数据写入到HUDI 同时同步到hive中 df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs()). // 设置表类型 COW 概念在后面会涉及到 option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") //设置主键 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") // 数据更新时间 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "city,gender")//指定多个分区字段 .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY , "org.apache.hudi.keygen.ComplexKeyGenerator") //--------------指定分区格式为hive的格式 .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,"true") .option(TABLE_NAME, "tb_user_e") //hudi中的表 .option("hoodie.datasource.hive_sync.enable", true) // 同步数据到hive .option("hoodie.datasource.hive_sync.table", "tb_user_e") //hive中的表 .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://doit01:10000") .option("hoodie.datasource.hive_sync.partition_fields", "city,gender") // hive分区字段 指定多个分区字段 .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.embed.timeline.server", false) .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). mode(Append). save("/hudi/tb_hudi_user_e"); } }
hudi目录中分区格式和hive表中的格式一致
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)