在虚拟机采用HDFS、Spark采用协同过滤算法进行电影的推荐

在虚拟机采用HDFS、Spark采用协同过滤算法进行电影的推荐,第1张

在虚拟机采用HDFS、Spark采用协同过滤算法进行电影的推荐

文章目录

相关参考内容一.数据来源二.采用HDFS对数据集进行存储和管理三.采用pyspark的交互式程序,根据用户的评分数据ratings.csv,采用协同过滤算法进行电影的推荐。

3.1获取hdfs地址3.2导入相关包3.3读取数据并展示3.4数据处理3.5划分训练集和测试集,对数据进行训练学习得到模型3.6模型评估3.7文件存储 四.相关代码如下

相关参考内容

更改虚拟机默认python版本为3.5
推荐系统实战数据集下载地址
Pyspark.sql Dataframe 创建、 *** 作、输出
Spark 2.1.0 入门:协同过滤算法(Python版)
厦门大学数据库实验室

一.数据来源

1834 位用户对电影的打分数据,总共1028751条。
数据说明:
ratings.csv 文件包含如下所述的标题:
userId:散列的 user_id。
movieId:用户(对应userId)评分的电影的id。
评级:用户提供的评级(从 0.5 到 5 星)。
tstamp:用户评价电影的时间。

二.采用HDFS对数据集进行存储和管理

将获取到的rating数据集上传到虚拟机的:r"/usr/local/spark2/mycode/test_1/ratings.csv处。
启动hadoop,进行hdfs存储数据

cd /usr/local/hadoop
 ./sbin/start-dfs.sh



将本地数据:ratings.csv上传致hdfs中的 /user/hadoop/input 中

/bin/hdfs dfs -put  /usr/local/spark2/mycode/test_1/ratings.csv input
./bin/hdfs dfs -ls input

三.采用pyspark的交互式程序,根据用户的评分数据ratings.csv,采用协同过滤算法进行电影的推荐。 3.1获取hdfs地址
hdfs getconf -confKey fs.default.name

启动pyspark

3.2导入相关包
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import Regressionevaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import  pandas as pd
import numpy as np

3.3读取数据并展示
sc = SparkContext()
sqlcontext = SQLContext(sc)
data=sqlcontext.read.format("com.databricks.spark.csv").options(header="true",inferschema="true").load("hdfs://localhost:9000/user/hadoop/input/ratings.csv")
data.show()

3.4数据处理

需要进数据都转化为数字型数据,所以useri一栏需要进行处理。

tem=temdata['useri'].unique().tolist()
user=np.arange(0,len(tem),1)
temdata['useri']=temdata['useri'].map(dict(zip(tem,user)))
temdata=temdata.iloc[:,0:3]
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataframe(temdata)


转化后的数据如下

3.5划分训练集和测试集,对数据进行训练学习得到模型

使用ALS来建立推荐模型,这里我们构建了两个模型,一个是显性反馈,一个是隐性反馈。

training, test = df.randomSplit([0.8,0.2])
alsExplicit  = ALS(maxIter=5, regParam=0.01, userCol="useri", itemCol=" movie_id", ratingCol=" rating")
alsImplicit = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,userCol="useri", itemCol=" movie_id", ratingCol=" rating")
modelExplicit = alsExplicit.fit(training)
modelImplicit = alsImplicit.fit(training)
predictionsExplicit = modelExplicit.transform(test)
predictionsImplicit = modelImplicit.transform(test)
pre_EX=predictionsExplicit.where("prediction!='NaN'")
pre_IM=predictionsImplicit.where("prediction!='NaN'")


我们把结果输出,对比一下真实结果与预测结果

3.6模型评估

通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确

evaluator=Regressionevaluator().setMetricName("rmse").setLabelCol(" rating").setPredictionCol("prediction")
rmseExplicit=evaluator.evaluate(pre_EX)
rmseIMplicit=evaluator.evaluate(pre_IM)
print("Explicit:Root-mean-square error = "+str(rmseExplicit))
print("Explicit:Root-mean-square error = "+str(rmseIMplicit))


可以看到显性反馈模型打分的均方差值为0.82,说明预测较好。

将每个用户的预测评分情况输出打分最高的5部电影的得分和名称进行电影的推荐:

b=pre_EX.toPandas()
b.groupby('useri').max('prediction').head(5)
column = b['useri'].unique()
recomend_list={}
for i in column:
    tem=b[b['useri'] == i].head(5)[' movie_id'].to_list()
    recomend_list[i]=tem
column = b['useri'].unique()                                                
recomend_list={}
for i in column:
     tem=b[b['useri'] == i].head(5)[' movie_id'].to_list()
     recomend_list[i]=tem
recomend_list


这是为每个用户生成的5部电影推荐列表。

3.7文件存储

将推荐文档进行存储为:recommend_movie.csv

tem_l1=[]
tem_l2=[]
for i,j in recomend_list.items():
    tem_l1.append(i)
    tem_l2.append(j)
rec_csv=pd.Dataframe({'user':tem_l1,'recommend_movie':tem_l2})
rec_csv.to_csv(r"/usr/local/spark2/mycode/test_1/recommend_movie.csv")

四.相关代码如下
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import Regressionevaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import  pandas as pd
import numpy as np

sc = SparkContext()
sqlcontext = SQLContext(sc)
data=sqlcontext.read.format("com.databricks.spark.csv").options(header="true",inferschema="true").load("hdfs://localhost:9000/user/hadoop/input/ratings.csv")
data.show()
tem=temdata['useri'].unique().tolist()
user=np.arange(0,len(tem),1)
temdata['useri']=temdata['useri'].map(dict(zip(tem,user)))
temdata=temdata.iloc[:,0:3]
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataframe(temdata)
training, test = df.randomSplit([0.8,0.2])
alsExplicit  = ALS(maxIter=5, regParam=0.01, userCol="useri", itemCol=" movie_id", ratingCol=" rating")
alsImplicit = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,userCol="useri", itemCol=" movie_id", ratingCol=" rating")
modelExplicit = alsExplicit.fit(training)
modelImplicit = alsImplicit.fit(training)
predictionsExplicit = modelExplicit.transform(test)
predictionsImplicit = modelImplicit.transform(test)
pre_EX=predictionsExplicit.where("prediction!='NaN'")
pre_IM=predictionsImplicit.where("prediction!='NaN'")
evaluator = Regressionevaluator().setMetricName("rmse").setLabelCol(" rating").setPredictionCol("prediction")
rmseExplicit=evaluator.evaluate(pre_EX)
rmseIMplicit=evaluator.evaluate(pre_IM)
print("Explicit:Root-mean-square error = "+str(rmseExplicit))
print("Explicit:Root-mean-square error = "+str(rmseIMplicit))

b=pre_EX.toPandas()
b.groupby('useri').max('prediction').head(5)
column = b['useri'].unique()
recomend_list={}
for i in column:
    tem=b[b['useri'] == i].head(5)[' movie_id'].to_list()
    recomend_list[i]=tem
column = b['useri'].unique()                                                
recomend_list={}
for i in column:
     tem=b[b['useri'] == i].head(5)[' movie_id'].to_list()
     recomend_list[i]=tem
recomend_list
tem_l1=[]
tem_l2=[]
for i,j in recomend_list.items():
    tem_l1.append(i)
    tem_l2.append(j)
rec_csv=pd.Dataframe({'user':tem_l1,'recommend_movie':tem_l2})
rec_csv.to_csv(r"/usr/local/spark2/mycode/test_1/recommend_movie.csv")


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705013.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存