项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1
id,name,profession,enroll,score 1,庄劲聪,经济学类,北京理工大学,551 2,吴雅思,经济学类,北京理工大学,529 3,周育传,经济学类,北京理工大学,682 4,丁俊伟,通信工程,北京电子科技学院,708 5,庄逸琳,通信工程,北京电子科技学院,708 6,吴志发,通信工程,北京电子科技学院,578 7,肖妮娜,通信工程,北京电子科技学院,557 8,蔡建明,通信工程,北京电子科技学院,583 9,林逸翔,通信工程,北京电子科技学院,543输入数据集2
id,name,profession,enroll,score 1,曾凰妹,金融学,北京电子科技学院,637 2,谢德炜,金融学,北京电子科技学院,542 4,王丽云,金融学,北京电子科技学院,626 5,吴鸿毅,金融学,北京电子科技学院,591 6,施珊珊,经济学类,北京理工大学,581 7,柯祥坤,经济学类,北京理工大学,650 1,庄劲聪,经济学类,北京理工大学,551输出数据
+---+------+----------------+ | id| 名称| 学院| +---+------+----------------+ | 1|庄劲聪| null| | 2|吴雅思| null| | 3|周育传| null| | 4|丁俊伟| null| | 5|庄逸琳| null| | 6|吴志发| null| | 7|肖妮娜| null| | 8|蔡建明| null| | 9|林逸翔| null| | 1|曾凰妹|北京电子科技学院| | 2|谢德炜|北京电子科技学院| | 4|王丽云|北京电子科技学院| | 5|吴鸿毅|北京电子科技学院| | 6|施珊珊| 北京理工大学| | 7|柯祥坤| 北京理工大学| | 1|庄劲聪| 北京理工大学| +---+------+----------------+程序代码
package com.cch.bigdata.spark.process.union import com.cch.bigdata.spark.process.AbstractTransform import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{Column, Dataframe} import scala.collection.mutable import scala.collection.mutable.ListBuffer //union合并 *** 作 class Union extends AbstractTransform{ //1->合并不去重 2->合并行去重 private val union_type_set = Set(1,2) private val union_type = 1 //表1需要合并的列名 private val left_table_union_columns = Array[String]("id","name","aaa") //表2需要合并的列名 private val right_table_union_columns = Array[String]("id","name","enroll") //合并后的列的名称 //如果没有设置,则使用表1的列名 private val new_columns = Array[String]("id","名称","学院") override def process(): Unit = { if(!union_type_set.contains(union_type)){ throw new RuntimeException("合并类型错误!") } if(left_table_union_columns.isEmpty){ throw new RuntimeException("表1合并列不能为空!") } if(right_table_union_columns.isEmpty){ throw new RuntimeException("表2合并列不能为空!") } //处理表1列名称和别名映射 val nameMapping = new mutable.linkedHashMap[String, String]() var index = 0 left_table_union_columns.foreach(c=>{ nameMapping(c) = new_columns(index) index+=1 }) //第一个数据输入 val firstDF: Dataframe = loadCsv("src/main/resources/csv/admission_1.csv",spark) //获取第一个数据输入的schema val firstColumnSet:Set[String] = firstDF.schema.fieldNames.toSet //第二个数据输入 val secondDF: Dataframe = loadCsv("src/main/resources/csv/admission_2.csv",spark) //获取第二个数据输入的schema val secondColumnSet: Set[String] = secondDF.schema.fieldNames.toSet //构造表1的select参数 val leftColumnList: ListBuffer[Column] = new ListBuffer() //表1需要添加的新列 val withLeftColumnList: ListBuffer[String] = new ListBuffer() for ((k, v) <- nameMapping){ if(firstColumnSet.contains(k)){ //这个配置的union列是表1中的 leftColumnList.append(col(k).as(v)) }else{ //这个是个新列 withLeftColumnList.append(k) } } var leftQueryDf: Dataframe = firstDF.select(leftColumnList.map(c => { c }): _*) withLeftColumnList.foreach(c=>{ var asName: String = nameMapping(c) if(asName.isEmpty){ asName = c } leftQueryDf = leftQueryDf.withColumn(asName,lit(null)) }) //构造表2的select参数 val rightColumnList: ListBuffer[Column] = new ListBuffer() //表2需要添加的新列 val withRightColumnList: ListBuffer[String] = new ListBuffer() right_table_union_columns.foreach(c=>{ if(secondColumnSet.contains(c)){ //这个配置的union列是表2中的 rightColumnList.append(col(c)) }else{ //这个是表2的新列 withRightColumnList.append(c) } }) var rightQueryDf: Dataframe = secondDF.select(rightColumnList.map(c => { c }): _*) withRightColumnList.foreach(c=>{ rightQueryDf = rightQueryDf.withColumn(c,lit(null)) }) union_type match { case 1 =>{ //不去重合并 leftQueryDf.union(rightQueryDf).show() } case 2 =>{ //去重合并 leftQueryDf.union(rightQueryDf).dropDuplicates().show() } } } override def getAppName(): String = "union合并" } object Union{ def main(args: Array[String]): Unit = { new Union().process() } }参数解释
union_type:1->合并不去重 2->合并行去重left_table_union_columns:指定左表需要合并的列right_table_union_columns:右表需要合并的列new_columns:合并结果列的名称
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)