spark业务开发-列值替换

spark业务开发-列值替换,第1张

spark业务开发-列值替换 spark业务开发-列值替换

项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据

order_number,order_date,purchaser,quantity,product_id,remark
10001,2016-01-16,1001,1,102,机q器w记e录r
10003,2016-01-17,1002,2,105,人工记录
10002,2016-01-19,1002,3,106,人工补录
10004,2016-02-21,1003,4,107,自然交易
10001,2016-01-16,1001,1,102,机器记录

输出数据
+------------+-------------------+---------+--------+----------+------------+
|order_number|         order_date|purchaser|quantity|product_id|      remark|
+------------+-------------------+---------+--------+----------+------------+
|       10001|2016-01-16 00:00:00|     1001|       1|       102|机q器w记e录r|
|       10003|2016-01-17 00:00:00|     1002|       2|       105|    人工记录|
|       10002|2016-01-19 00:00:00|     1002|       3|       106|    人工补录|
|       10004|2016-02-21 00:00:00|     1003|       4|       107|    自然交易|
|       10001|2016-01-16 00:00:00|     1001|       1|       102|  计算机记录|
+------------+-------------------+---------+--------+----------+------------+
程序代码
package com.cch.bigdata.spark.process.replace

import com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.sql.Dataframe
import org.apache.spark.sql.functions.{col, expr, when}

class ValueReplacer extends AbstractTransform{

  case class ReplaceColumn(name:String,strategy:Int,old_value:Any,new_value:Any)


  //需要替换值的列
  private val column = Array[String]("remark")

  //替换方式
  private val strategy = Array[String]("1")

  //需要替换的值
  private val old_value = Array[Any]("机器记录")

  //替换后的新值
  private val new_value = Array[Any]("计算机记录")


  override def process(): Unit = {

    if(column.isEmpty){
      throw new RuntimeException("替换列配置不能为空")
    }

    if(old_value==null){
      throw new RuntimeException("替换列被替换值配置不能为空")
    }

    if(new_value==null){
      throw new RuntimeException("替换列新值配置不能为空")
    }


    //获取上游数据集
    var df: Dataframe = loadCsv("src/main/resources/csv/orders.csv",spark)


    var index = 0
    column.foreach(c=>{
      val replaceColumn: ReplaceColumn = ReplaceColumn(c,strategy(index).toInt,old_value(index),new_value(index))

      replaceColumn.strategy match {
        case 1 =>{
          //字符串或数值替换
          df = valueReplace(df,replaceColumn.name,replaceColumn.old_value, replaceColumn.new_value)
        }
        case 2 =>{
          //正则表达式替换
          df = regexpReplace(df,replaceColumn.name,replaceColumn.old_value.toString, replaceColumn.new_value)
        }
      }
      index+=1

    })

    df.show()


  }

  //数值/字符串类型替换
  def valueReplace(df:Dataframe,columnName:String, oldValue:Any,newValue:Any):Dataframe={
    df.withColumn(columnName, when(col(columnName) === oldValue,newValue)
      .otherwise(col(columnName)))
  }

  //正则替换
  def regexpReplace(df:Dataframe,columnName:String,regexp:String,newValue:Any):Dataframe={
    val exprString:String = "regexp_replace("+columnName+",'"+regexp+"','"+newValue+"')"
    df.withColumn(columnName,expr(exprString).alias(columnName))
  }

  override def getAppName(): String = "值替换"

}

object ValueReplacer{
  def main(args: Array[String]): Unit = {
    new ValueReplacer().process()
  }
}


参数解释

columns:需要进行值替换的列,字符串数组strategy:替换策略,(1:字符串或值替换 2->使用正则表达式替换)old_value:需要被替换的值new_value:替换后的新值

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709716.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存