项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据
order_number,order_date,purchaser,quantity,product_id,remark 10001,2016-01-16,1001,1,102,机q器w记e录r 10003,2016-01-17,1002,2,105,人工记录 10002,2016-01-19,1002,3,106,人工补录 10004,2016-02-21,1003,4,107,自然交易 10001,2016-01-16,1001,1,102,机器记录输出数据
+------------+-------------------+---------+--------+----------+------------+ |order_number| order_date|purchaser|quantity|product_id| remark| +------------+-------------------+---------+--------+----------+------------+ | 10001|2016-01-16 00:00:00| 1001| 1| 102|机q器w记e录r| | 10003|2016-01-17 00:00:00| 1002| 2| 105| 人工记录| | 10002|2016-01-19 00:00:00| 1002| 3| 106| 人工补录| | 10004|2016-02-21 00:00:00| 1003| 4| 107| 自然交易| | 10001|2016-01-16 00:00:00| 1001| 1| 102| 计算机记录| +------------+-------------------+---------+--------+----------+------------+程序代码
package com.cch.bigdata.spark.process.replace import com.cch.bigdata.spark.process.AbstractTransform import org.apache.spark.sql.Dataframe import org.apache.spark.sql.functions.{col, expr, when} class ValueReplacer extends AbstractTransform{ case class ReplaceColumn(name:String,strategy:Int,old_value:Any,new_value:Any) //需要替换值的列 private val column = Array[String]("remark") //替换方式 private val strategy = Array[String]("1") //需要替换的值 private val old_value = Array[Any]("机器记录") //替换后的新值 private val new_value = Array[Any]("计算机记录") override def process(): Unit = { if(column.isEmpty){ throw new RuntimeException("替换列配置不能为空") } if(old_value==null){ throw new RuntimeException("替换列被替换值配置不能为空") } if(new_value==null){ throw new RuntimeException("替换列新值配置不能为空") } //获取上游数据集 var df: Dataframe = loadCsv("src/main/resources/csv/orders.csv",spark) var index = 0 column.foreach(c=>{ val replaceColumn: ReplaceColumn = ReplaceColumn(c,strategy(index).toInt,old_value(index),new_value(index)) replaceColumn.strategy match { case 1 =>{ //字符串或数值替换 df = valueReplace(df,replaceColumn.name,replaceColumn.old_value, replaceColumn.new_value) } case 2 =>{ //正则表达式替换 df = regexpReplace(df,replaceColumn.name,replaceColumn.old_value.toString, replaceColumn.new_value) } } index+=1 }) df.show() } //数值/字符串类型替换 def valueReplace(df:Dataframe,columnName:String, oldValue:Any,newValue:Any):Dataframe={ df.withColumn(columnName, when(col(columnName) === oldValue,newValue) .otherwise(col(columnName))) } //正则替换 def regexpReplace(df:Dataframe,columnName:String,regexp:String,newValue:Any):Dataframe={ val exprString:String = "regexp_replace("+columnName+",'"+regexp+"','"+newValue+"')" df.withColumn(columnName,expr(exprString).alias(columnName)) } override def getAppName(): String = "值替换" } object ValueReplacer{ def main(args: Array[String]): Unit = { new ValueReplacer().process() } }参数解释
columns:需要进行值替换的列,字符串数组strategy:替换策略,(1:字符串或值替换 2->使用正则表达式替换)old_value:需要被替换的值new_value:替换后的新值
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)