例如我们有一个基于用户(userId)对物品(itemId)进行打分(vote)的数据源
然后我们想根据用户的协同过滤算法计算某用户的TopN相似用户
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions._ object Test { case class UserItem(userId:String,itemId:String,score:Double) def main(args: Array[String]): Unit = { //导入数据 val spark = SparkSession.builder() .master("local") .getOrCreate() val df = spark.read.format("csv") .option("header", true) .option("delimiter", ",") .option("inferschema", true) .schema(ScalaReflection.schemaFor[UserItem] .dataType.asInstanceOf[StructType]) .load("~/cf_user_based.csv") df.show(false) // 通过余弦相似度计算用户的相似度 // 余弦相似度的公式 : (A * B) / (|A| * |B|) // 分母 每个向量的模的乘积 import spark.implicits._ val dfScoreMod = df.rdd.map(x=>(x(0).toString,x(2).toString)) .groupByKey() //按照用户id进行分组 .mapValues(score=>math.sqrt( score.toArray.map( itemScore=>math.pow(itemScore.toDouble,2) ).reduce(_+_) // ((物品a的打分)**2 + (物品b的打分)**2 .. (物品n的打分)**2))** 1/2 )) .toDF("userId","mod") dfScoreMod.show(false) //分子 val _dfTmp = df.select( col("userId").as("_userId"), col("itemId"), col("score").as("_score") ) //这里会引起shuffle,大家思考下如何优化 //这里目的是把两两用户都放到了同一张表里 val _df = df.join(_dfTmp,df("itemId") === _dfTmp("itemId")) .where( df("userId") =!= _dfTmp("_userId") ) .select( df("itemId"), df("userId"), _dfTmp("_userId"), df("score"), _dfTmp("_score") ) _df.show(false) // 两两向量的维度乘积并加总 val df_mol = _df.select( col("userId"), col("_userId"), (col("score") * col("_score")) .as("score_mol") //用户a和用户b对各自对同一个物品打分的乘积 ).groupBy(col("userId"),col("_userId")) .agg(sum("score_mol")) //加总 .withColumnRenamed( "sum(score_mol)", "mol" ) df_mol.show(false) // 计算两两向量的余弦相似度 val _dfScoreMod = dfScoreMod.select( col("userId").as("_userId"), col("mod").as("_mod") ) //这里也会引起shuffle,大家思考下如何优化 //分子表(df_mol)和分母表(dfScoreMod)都放在一张表里 val sim = df_mol.join( dfScoreMod, df_mol("userId") === dfScoreMod("userId") ).join( _dfScoreMod, df_mol("_userId") === _dfScoreMod("_userId") ).select( df_mol("userId"), df_mol("_userId"), df_mol("mol"), dfScoreMod("mod"), _dfScoreMod("_mod") ) sim.show(false) // 分子 / 分母 val cos_sim = sim.select( col("userId"), col("_userId"), (col("mol") / (col("mod") * col("_mod"))) .as("cos_sim") ) cos_sim.show(false) // 列出某个用户的TopN相似用户 val topN = cos_sim.rdd.map(x=>( (x(0).toString, (x(1).toString,x(2).toString) ) // 形成 (uid1,(uid2,cos_sim)) )).groupByKey() .mapValues(_.toArray.sortWith((x,y)=>x._2 > y._2)) //根据相似度排序 .flatMapValues(x=>x) .toDF("userId","sim_sort") .select( col("userId"), col("sim_sort._1").as("_userId"), col("sim_sort._2").as("cos_sim") ).where(col("userId") === "1") topN.show(false) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)