项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据
subject,name,score 数学,张三,88 语文,张三,92 英语,张三,77 数学,王五,65 语文,王五,87 英语,王五,90 数学,李雷,67 语文,李雷,33 英语,李雷,24 数学,宫九,77 语文,宫九,87 英语,宫九,90输出数据
+-------+----+-----+ |subject|name|score| +-------+----+-----+ | 语文|张三| 92| | 英语|王五| 90| | 英语|宫九| 90| | 数学|张三| 88| | 语文|王五| 87| | 语文|宫九| 87| | 数学|宫九| 77| | 英语|张三| 77| | 数学|李雷| 67| | 数学|王五| 65| | 语文|李雷| 33| | 英语|李雷| 24| +-------+----+-----+程序代码
package com.cch.bigdata.spark.process.sort import com.cch.bigdata.spark.process.AbstractTransform import org.apache.spark.sql.{Column, Dataframe} import scala.collection.mutable.ListBuffer //排序算子 class Sorter extends AbstractTransform{ //需要排序的列 private val columns = Array[String]("score") //指定升序还是降序 private val sorts = Array[String]("desc") override def process(): Unit = { if(columns.isEmpty){ throw new RuntimeException("排序列未指定") } if(sorts.isEmpty){ throw new RuntimeException("排序方式未指定:[asc/desc]") } if(columns.length!=sorts.length){ throw new RuntimeException("排序字段和排序规则不匹配") } //获取输入流 val df: Dataframe = loadCsv("src/main/resources/csv/score.csv",spark) //循环下标,用于迭代排序规则 var index = 0 val list:ListBuffer[Column] = ListBuffer() columns.foreach(c=>{ val sort: String = sorts(index) sort match { case "asc" =>{ list.append(df.col(c).asc) } case "desc" =>{ list.append(df.col(c).desc) } case _=>{ throw new RuntimeException("排序规则只支持asc/desc") } } index+=1 }) df.orderBy(list.map(c=>{c}):_*).show() } override def getAppName(): String = "排序" } object Sorter{ def main(args: Array[String]): Unit = { new Sorter().process() } }参数解释
columns:需要排序的列,字符串数组sorts:排序类型,字符串数组
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)