相关参考内容一.数据来源二.采用HDFS对数据集进行存储和管理三.采用pyspark的交互式程序,根据用户的评分数据ratings.csv,采用协同过滤算法进行电影的推荐。
3.1获取hdfs地址3.2导入相关包3.3读取数据并展示3.4数据处理3.5划分训练集和测试集,对数据进行训练学习得到模型3.6模型评估3.7文件存储 四.相关代码如下
相关参考内容更改虚拟机默认python版本为3.5
推荐系统实战数据集下载地址
Pyspark.sql Dataframe 创建、 *** 作、输出
Spark 2.1.0 入门:协同过滤算法(Python版)
厦门大学数据库实验室
1834 位用户对电影的打分数据,总共1028751条。
数据说明:
ratings.csv 文件包含如下所述的标题:
userId:散列的 user_id。
movieId:用户(对应userId)评分的电影的id。
评级:用户提供的评级(从 0.5 到 5 星)。
tstamp:用户评价电影的时间。
将获取到的rating数据集上传到虚拟机的:r"/usr/local/spark2/mycode/test_1/ratings.csv处。
启动hadoop,进行hdfs存储数据
cd /usr/local/hadoop ./sbin/start-dfs.sh
将本地数据:ratings.csv上传致hdfs中的 /user/hadoop/input 中
/bin/hdfs dfs -put /usr/local/spark2/mycode/test_1/ratings.csv input ./bin/hdfs dfs -ls input三.采用pyspark的交互式程序,根据用户的评分数据ratings.csv,采用协同过滤算法进行电影的推荐。 3.1获取hdfs地址
hdfs getconf -confKey fs.default.name
启动pyspark
from pyspark.sql import SQLContext from pyspark.ml.evaluation import Regressionevaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row import pandas as pd import numpy as np3.3读取数据并展示
sc = SparkContext() sqlcontext = SQLContext(sc) data=sqlcontext.read.format("com.databricks.spark.csv").options(header="true",inferschema="true").load("hdfs://localhost:9000/user/hadoop/input/ratings.csv") data.show()3.4数据处理
需要进数据都转化为数字型数据,所以useri一栏需要进行处理。
tem=temdata['useri'].unique().tolist() user=np.arange(0,len(tem),1) temdata['useri']=temdata['useri'].map(dict(zip(tem,user))) temdata=temdata.iloc[:,0:3] spark.conf.set("spark.sql.execution.arrow.enabled", "true") df = spark.createDataframe(temdata)
转化后的数据如下
使用ALS来建立推荐模型,这里我们构建了两个模型,一个是显性反馈,一个是隐性反馈。
training, test = df.randomSplit([0.8,0.2]) alsExplicit = ALS(maxIter=5, regParam=0.01, userCol="useri", itemCol=" movie_id", ratingCol=" rating") alsImplicit = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,userCol="useri", itemCol=" movie_id", ratingCol=" rating") modelExplicit = alsExplicit.fit(training) modelImplicit = alsImplicit.fit(training) predictionsExplicit = modelExplicit.transform(test) predictionsImplicit = modelImplicit.transform(test) pre_EX=predictionsExplicit.where("prediction!='NaN'") pre_IM=predictionsImplicit.where("prediction!='NaN'")
我们把结果输出,对比一下真实结果与预测结果
通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确
evaluator=Regressionevaluator().setMetricName("rmse").setLabelCol(" rating").setPredictionCol("prediction") rmseExplicit=evaluator.evaluate(pre_EX) rmseIMplicit=evaluator.evaluate(pre_IM) print("Explicit:Root-mean-square error = "+str(rmseExplicit)) print("Explicit:Root-mean-square error = "+str(rmseIMplicit))
可以看到显性反馈模型打分的均方差值为0.82,说明预测较好。
将每个用户的预测评分情况输出打分最高的5部电影的得分和名称进行电影的推荐:
b=pre_EX.toPandas() b.groupby('useri').max('prediction').head(5) column = b['useri'].unique() recomend_list={} for i in column: tem=b[b['useri'] == i].head(5)[' movie_id'].to_list() recomend_list[i]=tem column = b['useri'].unique() recomend_list={} for i in column: tem=b[b['useri'] == i].head(5)[' movie_id'].to_list() recomend_list[i]=tem recomend_list
这是为每个用户生成的5部电影推荐列表。
将推荐文档进行存储为:recommend_movie.csv
tem_l1=[] tem_l2=[] for i,j in recomend_list.items(): tem_l1.append(i) tem_l2.append(j) rec_csv=pd.Dataframe({'user':tem_l1,'recommend_movie':tem_l2}) rec_csv.to_csv(r"/usr/local/spark2/mycode/test_1/recommend_movie.csv")四.相关代码如下
from pyspark.sql import SQLContext from pyspark.ml.evaluation import Regressionevaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row import pandas as pd import numpy as np sc = SparkContext() sqlcontext = SQLContext(sc) data=sqlcontext.read.format("com.databricks.spark.csv").options(header="true",inferschema="true").load("hdfs://localhost:9000/user/hadoop/input/ratings.csv") data.show() tem=temdata['useri'].unique().tolist() user=np.arange(0,len(tem),1) temdata['useri']=temdata['useri'].map(dict(zip(tem,user))) temdata=temdata.iloc[:,0:3] spark.conf.set("spark.sql.execution.arrow.enabled", "true") df = spark.createDataframe(temdata) training, test = df.randomSplit([0.8,0.2]) alsExplicit = ALS(maxIter=5, regParam=0.01, userCol="useri", itemCol=" movie_id", ratingCol=" rating") alsImplicit = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,userCol="useri", itemCol=" movie_id", ratingCol=" rating") modelExplicit = alsExplicit.fit(training) modelImplicit = alsImplicit.fit(training) predictionsExplicit = modelExplicit.transform(test) predictionsImplicit = modelImplicit.transform(test) pre_EX=predictionsExplicit.where("prediction!='NaN'") pre_IM=predictionsImplicit.where("prediction!='NaN'") evaluator = Regressionevaluator().setMetricName("rmse").setLabelCol(" rating").setPredictionCol("prediction") rmseExplicit=evaluator.evaluate(pre_EX) rmseIMplicit=evaluator.evaluate(pre_IM) print("Explicit:Root-mean-square error = "+str(rmseExplicit)) print("Explicit:Root-mean-square error = "+str(rmseIMplicit)) b=pre_EX.toPandas() b.groupby('useri').max('prediction').head(5) column = b['useri'].unique() recomend_list={} for i in column: tem=b[b['useri'] == i].head(5)[' movie_id'].to_list() recomend_list[i]=tem column = b['useri'].unique() recomend_list={} for i in column: tem=b[b['useri'] == i].head(5)[' movie_id'].to_list() recomend_list[i]=tem recomend_list tem_l1=[] tem_l2=[] for i,j in recomend_list.items(): tem_l1.append(i) tem_l2.append(j) rec_csv=pd.Dataframe({'user':tem_l1,'recommend_movie':tem_l2}) rec_csv.to_csv(r"/usr/local/spark2/mycode/test_1/recommend_movie.csv")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)