Spark解析json文件,实现跳跃顺序排序

Spark解析json文件,实现跳跃顺序排序,第1张

Spark解析json文件,实现跳跃顺序排序 目的要求

根据order值进行跳跃顺序排序,生成seq字段,同时生成索引值。

跳跃顺序排序

如order值相同,则排序是一样的,下一个排序会跳过,例如第1条数据的order值和第 2 条的值不同,第2条的order值与后2条的值相同,第4条数据的order值和第 5 条的值不同,则排序的 seq 值为 1,2,2,2,5。

数据格式

代码实现

**内有详情注释

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object task {
  def main(args: Array[String]): Unit = {
    val aaaa: SparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("task")
    val sc = new SparkContext(aaaa)
    //sum累加器用于对index自增列的赋值和seq跳跃顺序排序
    val sum: LongAccumulator = sc.longAccumulator("sum")
    //读取json数据
    val data: RDD[String] = sc.textFile("hdfs://master:9000/task.json")
    //解析json数据,并根据字段order分组
    val value: RDD[(String, Iterable[JSONObject])] = data.map(x => JSON.parseObject(x)).groupBy(x => x.getString("order"))
    //添加,修改
    val value1: RDD[List[JSONObject]] = value.map(x => {
      //将分组后的迭代对象变成列表l
      val l: List[JSONObject] = x._2.toList
      var j = 1
      var i = 1
      //实现对seq的跳跃顺序的赋值
      if (sum.value.toInt >= l.length) {
        j += sum.value.toInt
      }
      //映射列表l,实现对列表l插入seq与index列
      val objects: List[JSONObject] = l.map(x => {
        //插入新列并赋值
        x.put("seq", j)
        sum.add(1)
        //添加自增列
        x.put("index", sum.value)
        x
      })
      objects
    })
    //得到每一个list中的json数据
    val value2: RDD[JSONObject] = value1.flatMap(x => {
      x.iterator
    })
    //保存文件,并设置分区为1
    value2.repartition(1).saveAsTextFile("hdfs://master:9000/diliveryoutput1")

  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5699762.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存