spark业务开发-union合并(union)

spark业务开发-union合并(union),第1张

spark业务开发-union合并(union) spark业务开发-union合并(union)

项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1

id,name,profession,enroll,score
1,庄劲聪,经济学类,北京理工大学,551
2,吴雅思,经济学类,北京理工大学,529
3,周育传,经济学类,北京理工大学,682
4,丁俊伟,通信工程,北京电子科技学院,708
5,庄逸琳,通信工程,北京电子科技学院,708
6,吴志发,通信工程,北京电子科技学院,578
7,肖妮娜,通信工程,北京电子科技学院,557
8,蔡建明,通信工程,北京电子科技学院,583
9,林逸翔,通信工程,北京电子科技学院,543

输入数据集2
id,name,profession,enroll,score
1,曾凰妹,金融学,北京电子科技学院,637
2,谢德炜,金融学,北京电子科技学院,542
4,王丽云,金融学,北京电子科技学院,626
5,吴鸿毅,金融学,北京电子科技学院,591
6,施珊珊,经济学类,北京理工大学,581
7,柯祥坤,经济学类,北京理工大学,650
1,庄劲聪,经济学类,北京理工大学,551

输出数据
+---+------+----------------+
| id|  名称|            学院|
+---+------+----------------+
|  1|庄劲聪|            null|
|  2|吴雅思|            null|
|  3|周育传|            null|
|  4|丁俊伟|            null|
|  5|庄逸琳|            null|
|  6|吴志发|            null|
|  7|肖妮娜|            null|
|  8|蔡建明|            null|
|  9|林逸翔|            null|
|  1|曾凰妹|北京电子科技学院|
|  2|谢德炜|北京电子科技学院|
|  4|王丽云|北京电子科技学院|
|  5|吴鸿毅|北京电子科技学院|
|  6|施珊珊|    北京理工大学|
|  7|柯祥坤|    北京理工大学|
|  1|庄劲聪|    北京理工大学|
+---+------+----------------+
程序代码
package com.cch.bigdata.spark.process.union

import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{Column, Dataframe}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

//union合并 *** 作
class Union extends AbstractTransform{


  //1->合并不去重 2->合并行去重
  private val union_type_set = Set(1,2)
  private val union_type = 1

  //表1需要合并的列名
  private val left_table_union_columns = Array[String]("id","name","aaa")

  //表2需要合并的列名
  private val right_table_union_columns = Array[String]("id","name","enroll")

  //合并后的列的名称
  //如果没有设置,则使用表1的列名
  private val new_columns = Array[String]("id","名称","学院")


  override def process(): Unit = {


    if(!union_type_set.contains(union_type)){
      throw new RuntimeException("合并类型错误!")
    }

    if(left_table_union_columns.isEmpty){
      throw new RuntimeException("表1合并列不能为空!")
    }

    if(right_table_union_columns.isEmpty){
      throw new RuntimeException("表2合并列不能为空!")
    }

    //处理表1列名称和别名映射
    val nameMapping = new mutable.linkedHashMap[String, String]()
    var index = 0
    left_table_union_columns.foreach(c=>{
      nameMapping(c) = new_columns(index)
      index+=1
    })


    //第一个数据输入
    val firstDF: Dataframe = loadCsv("src/main/resources/csv/admission_1.csv",spark)
    //获取第一个数据输入的schema
    val firstColumnSet:Set[String] = firstDF.schema.fieldNames.toSet

    //第二个数据输入
    val secondDF: Dataframe = loadCsv("src/main/resources/csv/admission_2.csv",spark)
    //获取第二个数据输入的schema
    val secondColumnSet: Set[String] = secondDF.schema.fieldNames.toSet

    //构造表1的select参数
    val leftColumnList: ListBuffer[Column] = new ListBuffer()
    //表1需要添加的新列
    val withLeftColumnList: ListBuffer[String] = new ListBuffer()

    for ((k, v) <- nameMapping){
      if(firstColumnSet.contains(k)){
        //这个配置的union列是表1中的
        leftColumnList.append(col(k).as(v))
      }else{
        //这个是个新列
        withLeftColumnList.append(k)
      }
    }

    var leftQueryDf: Dataframe = firstDF.select(leftColumnList.map(c => {
      c
    }): _*)
    withLeftColumnList.foreach(c=>{
      var asName: String = nameMapping(c)
      if(asName.isEmpty){
        asName = c
      }
      leftQueryDf = leftQueryDf.withColumn(asName,lit(null))
    })


    //构造表2的select参数
    val rightColumnList: ListBuffer[Column] = new ListBuffer()
    //表2需要添加的新列
    val withRightColumnList: ListBuffer[String] = new ListBuffer()
    right_table_union_columns.foreach(c=>{
      if(secondColumnSet.contains(c)){
        //这个配置的union列是表2中的
        rightColumnList.append(col(c))
      }else{
        //这个是表2的新列
        withRightColumnList.append(c)
      }
    })

    var rightQueryDf: Dataframe = secondDF.select(rightColumnList.map(c => {
      c
    }): _*)
    withRightColumnList.foreach(c=>{
      rightQueryDf = rightQueryDf.withColumn(c,lit(null))
    })


    union_type match {
      case 1 =>{
        //不去重合并
        leftQueryDf.union(rightQueryDf).show()
      }

      case 2 =>{
        //去重合并
        leftQueryDf.union(rightQueryDf).dropDuplicates().show()
      }
    }
  }

  override def getAppName(): String = "union合并"
}

object Union{
  def main(args: Array[String]): Unit = {
    new Union().process()
  }
}


参数解释

union_type:1->合并不去重 2->合并行去重left_table_union_columns:指定左表需要合并的列right_table_union_columns:右表需要合并的列new_columns:合并结果列的名称

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705427.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存