根据order值进行跳跃顺序排序,生成seq字段,同时生成索引值。
跳跃顺序排序如order值相同,则排序是一样的,下一个排序会跳过,例如第1条数据的order值和第 2 条的值不同,第2条的order值与后2条的值相同,第4条数据的order值和第 5 条的值不同,则排序的 seq 值为 1,2,2,2,5。
**内有详情注释
import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object task { def main(args: Array[String]): Unit = { val aaaa: SparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("task") val sc = new SparkContext(aaaa) //sum累加器用于对index自增列的赋值和seq跳跃顺序排序 val sum: LongAccumulator = sc.longAccumulator("sum") //读取json数据 val data: RDD[String] = sc.textFile("hdfs://master:9000/task.json") //解析json数据,并根据字段order分组 val value: RDD[(String, Iterable[JSONObject])] = data.map(x => JSON.parseObject(x)).groupBy(x => x.getString("order")) //添加,修改 val value1: RDD[List[JSONObject]] = value.map(x => { //将分组后的迭代对象变成列表l val l: List[JSONObject] = x._2.toList var j = 1 var i = 1 //实现对seq的跳跃顺序的赋值 if (sum.value.toInt >= l.length) { j += sum.value.toInt } //映射列表l,实现对列表l插入seq与index列 val objects: List[JSONObject] = l.map(x => { //插入新列并赋值 x.put("seq", j) sum.add(1) //添加自增列 x.put("index", sum.value) x }) objects }) //得到每一个list中的json数据 val value2: RDD[JSONObject] = value1.flatMap(x => { x.iterator }) //保存文件,并设置分区为1 value2.repartition(1).saveAsTextFile("hdfs://master:9000/diliveryoutput1") } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)