[ Spark mllib ] 聚类K-means案例——2021挖掘

[ Spark mllib ] 聚类K-means案例——2021挖掘,第1张

[ Spark mllib ] 聚类K-means案例——2021挖掘

文章目录
  • 目的与要求
  • 数据格式
    • 数据读取与清洗
    • 聚合K-Means *** 作
    • 数据的再次清洗和拼接
    • 数据的输出打印
    • 完整代码
    • 最终结果

目的与要求
  1. 筛选北京地区(“城市”字段为“北京市”)商家数据记录形成筛选数据集。
  2. 根据北京地区商家的经纬度属性,对商家进行 k-means 聚类,聚类数设为 5,迭代次数为 2000 次。
  3. 打印语句输出聚类中心、每个类的商家数,以及该类所包含的商圈。(打印格式:=cluster 0: 聚类中心为[,],商家数为***个,包含商圈:【***,***,…】=)
数据格式

数据读取与清洗

读取、解析json文件。

val jsondata: RDD[JSONObject] = sc.textFile("D:\task.json").map(line => JSON.parseObject(line.toString))

将解析的json文件数据进行数据的进行过滤,筛选得到城市名为北京的数据集。

val data: RDD[JSONObject] = jsondata.filter(line => {
      line.getString("city_name") == "北京市"
    })

将筛选得到的数据集进行清洗,将其向量化。

val vectors: RDD[linalg.Vector] = data.map(line => {
      //取得经纬度使得变为数组
      val features = Array[Double(line.getString("latitude").toDouble,line.getString("longitude").toDouble)
      //向量化
      Vectors.dense(features)
    })
聚合K-Means *** 作

第一参数为清洗后的向量化数据,第二参数为期望的聚类的个数,第三参数为迭代次数。

val model = KMeans.train(vectors, 5, 2000)

可通过遍历打印查看聚类中心点以及给定代表的索引,或聚类的个数。

var clusterIndex = 0;
model.clusterCenters.foreach(cen => {
      print(clusterIndex + ":")
      println(cen.toString)
      clusterIndex += 1
    })
println("Cluster Number:" + value1.clusterCenters.length)
数据的再次清洗和拼接

映射一个新的rdd,用来统计商家数量,假设这里的每一条数据代表每一个不同商家。
用(聚类中心的索引,1)通过聚类中心的索引统计商家数量。

val marketers_number = data.map(line => {
      val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble)
      val i: Int = value1.predict(Vectors.dense(features))
      (i, 1)
    }).reduceByKey(_ + _)

用(聚类中心的索引,商圈)通过聚类中心的索引对其商圈进行分组(#注:商圈为location)。

val shopping = data.map(line => {
      val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble)
      val i: Int = value1.predict(Vectors.dense(features))
      (i, line.getString("location"))
    }).groupByKey()
数据的输出打印

join通过相同key(聚类中心的索引)进行拼接,使得数据变为(聚类中心,(商家数量,商圈))。
sortby通过x._1(聚类中心的索引)升序排序,默认为升序,降序则添加参数false。
遍历打印,将我们得到的聚类中心通过索引进行匹配。

marketers_number.join(shopping).sortBy(x => x._1).collect().foreach(x => {
      var centre = ""
      if (x._1 == 0) {
        centre = "[39.963067343283576,116.34161625870645]"
      }
      if (x._1 == 1) {
        centre = "[39.824223999999994,116.67271499999998]"
      }
      if (x._1 == 2) {
        centre = "[40.03647706976744,116.27083911627908]"
      }
      if (x._1 == 3) {
        centre = "[40.00760493333333,116.57222786666668]"
      }
      if (x._1 == 4) {
        centre = "[40.044718465116276,116.40492395348836]"
      }
      println("=cluster " + x._1 + ": 聚类中心为" + centre + ",商家数为" + x._2._1 + "个,包含商圈:【" + x._2._2.toString().replace("CompactBuffer(", "").replace(")", "") + "】")
    })
完整代码
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object task9 {
  def main(args: Array[String]): Unit = {
    //环境配置
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("task")
    val sc = new SparkContext(conf)
    val jsondata: RDD[JSONObject] = sc.textFile("D:\2021\diliveryoutput1").map(line => JSON.parseObject(line.toString))
    //筛选出城市名为北京市的数据
    val data: RDD[JSONObject] = jsondata.filter(line => {
      line.getString("city_name") == "北京市" && line.getString("longitude") != "纬度"
    })
   
    val vectors: RDD[linalg.Vector] = data.map(line => {
      //取得经纬度使得变为数组
      val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble)
      Vectors.dense(features)
    })
    // 第一参数为清洗后的向量化数据,第二参数为期望的聚类的个数,第三参数为迭代次数
    val value1 = KMeans.train(vectors, 5, 2000)
    
    //遍历打印查看聚类中心点以及给定代表的索引
    var clusterIndex = 0;
    value1.clusterCenters.foreach(c => {
      print(clusterIndex + ":")
      println(c.toString)
      clusterIndex += 1
    })
    val cost = value1.computeCost(vectors)
    println("Within Set Sum of Squared Errors=" + cost.toString)
   
    value1.clusterCenters.foreach(c => println(c))
   
    val marketers_number = data.map(line => {
      val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble)
      val i: Int = value1.predict(Vectors.dense(features))
      (i, 1)
    }).reduceByKey(_ + _)

    val shopping = data.map(line => {
      val features = Array[Double](line.getString("latitude").toDouble, line.getString("longitude").toDouble)
      val i: Int = value1.predict(Vectors.dense(features))
      (i, line.getString("location"))
    }).groupByKey()
    marketers_number.join(shopping).sortBy(x => x._1).collect().foreach(x => {
      var centre = ""
      if (x._1 == 0) {
        centre = "[39.963067343283576,116.34161625870645]"
      }
      if (x._1 == 1) {
        centre = "[39.824223999999994,116.67271499999998]"
      }
      if (x._1 == 2) {
        centre = "[40.03647706976744,116.27083911627908]"
      }
      if (x._1 == 3) {
        centre = "[40.00760493333333,116.57222786666668]"
      }
      if (x._1 == 4) {
        centre = "[40.044718465116276,116.40492395348836]"
      }
      println("=cluster " + x._1 + ": 聚类中心为" + centre + ",商家数为" + x._2._1 + "个,包含商圈:【" + x._2._2.toString().replace("CompactBuffer(", "").replace(")", "") + "】")
    })
  }

}
最终结果

这里因为商圈数量居多,故取商圈的前两个,后以…代替输出作为结果。

=cluster 0: 聚类中心为[39.963067343283576,116.34161625870645],商家数为185个,包含商圈:【中关村南三街, 科学院南路, …】=
=cluster 1: 聚类中心为[39.824223999999994,116.67271499999998],商家数为5个,包含商圈:【燕昌路, 康复路, …】=
=cluster 2: 聚类中心为[40.03647706976744,116.27083911627908],商家数为26个,包含商圈:【泥洼路, 彰化路,…】=
=cluster 3: 聚类中心为[40.00760493333333,116.57222786666668],商家数为48个,包含商圈:【双营路, 来广营东路,…】=
=cluster 4: 聚类中心为[40.044718465116276,116.40492395348836],商家数为45个,包含商圈:【南花市大街, 大宏巷,…】=

用简单的思路做题。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5610932.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存