- 目的与要求
- 数据格式
- 数据读取与清洗
- 聚合K-Means *** 作
- 数据的再次清洗和拼接
- 数据的输出打印
- 完整代码
- 最终结果
- 筛选北京地区(“城市”字段为“北京市”)商家数据记录形成筛选数据集。
- 根据北京地区商家的经纬度属性,对商家进行 k-means 聚类,聚类数设为 5,迭代次数为 2000 次。
- 打印语句输出聚类中心、每个类的商家数,以及该类所包含的商圈。(打印格式:=cluster 0: 聚类中心为[,],商家数为***个,包含商圈:【***,***,…】=)
读取、解析json文件。
val jsondata: RDD[JSONObject] = sc.textFile("D:\task.json").map(line => JSON.parseObject(line.toString))
将解析的json文件数据进行数据的进行过滤,筛选得到城市名为北京的数据集。
val data: RDD[JSONObject] = jsondata.filter(line => { line.getString("city_name") == "北京市" })
将筛选得到的数据集进行清洗,将其向量化。
val vectors: RDD[linalg.Vector] = data.map(line => { //取得经纬度使得变为数组 val features = Array[Double(line.getString("latitude").toDouble,line.getString("longitude").toDouble) //向量化 Vectors.dense(features) })聚合K-Means *** 作
第一参数为清洗后的向量化数据,第二参数为期望的聚类的个数,第三参数为迭代次数。
val model = KMeans.train(vectors, 5, 2000)
可通过遍历打印查看聚类中心点以及给定代表的索引,或聚类的个数。
var clusterIndex = 0; model.clusterCenters.foreach(cen => { print(clusterIndex + ":") println(cen.toString) clusterIndex += 1 }) println("Cluster Number:" + value1.clusterCenters.length)数据的再次清洗和拼接
映射一个新的rdd,用来统计商家数量,假设这里的每一条数据代表每一个不同商家。
用(聚类中心的索引,1)通过聚类中心的索引统计商家数量。
val marketers_number = data.map(line => { val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble) val i: Int = value1.predict(Vectors.dense(features)) (i, 1) }).reduceByKey(_ + _)
用(聚类中心的索引,商圈)通过聚类中心的索引对其商圈进行分组(#注:商圈为location)。
val shopping = data.map(line => { val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble) val i: Int = value1.predict(Vectors.dense(features)) (i, line.getString("location")) }).groupByKey()数据的输出打印
join通过相同key(聚类中心的索引)进行拼接,使得数据变为(聚类中心,(商家数量,商圈))。
sortby通过x._1(聚类中心的索引)升序排序,默认为升序,降序则添加参数false。
遍历打印,将我们得到的聚类中心通过索引进行匹配。
marketers_number.join(shopping).sortBy(x => x._1).collect().foreach(x => { var centre = "" if (x._1 == 0) { centre = "[39.963067343283576,116.34161625870645]" } if (x._1 == 1) { centre = "[39.824223999999994,116.67271499999998]" } if (x._1 == 2) { centre = "[40.03647706976744,116.27083911627908]" } if (x._1 == 3) { centre = "[40.00760493333333,116.57222786666668]" } if (x._1 == 4) { centre = "[40.044718465116276,116.40492395348836]" } println("=cluster " + x._1 + ": 聚类中心为" + centre + ",商家数为" + x._2._1 + "个,包含商圈:【" + x._2._2.toString().replace("CompactBuffer(", "").replace(")", "") + "】") })完整代码
import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object task9 { def main(args: Array[String]): Unit = { //环境配置 val conf: SparkConf = new SparkConf().setMaster("local").setAppName("task") val sc = new SparkContext(conf) val jsondata: RDD[JSONObject] = sc.textFile("D:\2021\diliveryoutput1").map(line => JSON.parseObject(line.toString)) //筛选出城市名为北京市的数据 val data: RDD[JSONObject] = jsondata.filter(line => { line.getString("city_name") == "北京市" && line.getString("longitude") != "纬度" }) val vectors: RDD[linalg.Vector] = data.map(line => { //取得经纬度使得变为数组 val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble) Vectors.dense(features) }) // 第一参数为清洗后的向量化数据,第二参数为期望的聚类的个数,第三参数为迭代次数 val value1 = KMeans.train(vectors, 5, 2000) //遍历打印查看聚类中心点以及给定代表的索引 var clusterIndex = 0; value1.clusterCenters.foreach(c => { print(clusterIndex + ":") println(c.toString) clusterIndex += 1 }) val cost = value1.computeCost(vectors) println("Within Set Sum of Squared Errors=" + cost.toString) value1.clusterCenters.foreach(c => println(c)) val marketers_number = data.map(line => { val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble) val i: Int = value1.predict(Vectors.dense(features)) (i, 1) }).reduceByKey(_ + _) val shopping = data.map(line => { val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble) val i: Int = value1.predict(Vectors.dense(features)) (i, line.getString("location")) }).groupByKey() marketers_number.join(shopping).sortBy(x => x._1).collect().foreach(x => { var centre = "" if (x._1 == 0) { centre = "[39.963067343283576,116.34161625870645]" } if (x._1 == 1) { centre = "[39.824223999999994,116.67271499999998]" } if (x._1 == 2) { centre = "[40.03647706976744,116.27083911627908]" } if (x._1 == 3) { centre = "[40.00760493333333,116.57222786666668]" } if (x._1 == 4) { centre = "[40.044718465116276,116.40492395348836]" } println("=cluster " + x._1 + ": 聚类中心为" + centre + ",商家数为" + x._2._1 + "个,包含商圈:【" + x._2._2.toString().replace("CompactBuffer(", "").replace(")", "") + "】") }) } }最终结果
这里因为商圈数量居多,故取商圈的前两个,后以…代替输出作为结果。
=cluster 0: 聚类中心为[39.963067343283576,116.34161625870645],商家数为185个,包含商圈:【中关村南三街, 科学院南路, …】=
=cluster 1: 聚类中心为[39.824223999999994,116.67271499999998],商家数为5个,包含商圈:【燕昌路, 康复路, …】=
=cluster 2: 聚类中心为[40.03647706976744,116.27083911627908],商家数为26个,包含商圈:【泥洼路, 彰化路,…】=
=cluster 3: 聚类中心为[40.00760493333333,116.57222786666668],商家数为48个,包含商圈:【双营路, 来广营东路,…】=
=cluster 4: 聚类中心为[40.044718465116276,116.40492395348836],商家数为45个,包含商圈:【南花市大街, 大宏巷,…】=
用简单的思路做题。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)