MLlib ( Machine Learning Library )是 Spark 的一个机器学习库。它能够较容易地解决一些实际的大规模机器学习问题。本实验旨在学习 Spark 的机器学习库—— MLlib 的相关知识,了解 MLlib 与 ML 之间的区别和联系,掌握 MLlib 中的几个基本数据类型
实验时长:90分钟
主要步骤:
学习Mllib的基本数据类型
学习Mllib的基本算法库
利用Mllib算法库中的协同推荐算法为用户推荐电影
2、实验环境
虚拟机数量:1
系统版本:CentOS 7.5
Spark版本:Apache Spark 2.1.1
3、相关技能linux常用命令Spark shellScala语言编程MLlib 库的使用协同过滤算法Spark SQL应用 4、相关知识点
数据类型:本地向量、标签点、本地矩阵稠密向量、稀疏向量MLlib 库的主要内容MLlib 的数据类型和算法种类协同过滤算法Spark SQL 5、效果图
利用ALS模型对测试数据做预测,看到这个误差值是在可以接受的范围内的。详情请看实验步骤
6.1MLlib 的数据类型
6.1.1本地向量
6.1.1.1本地向量(Local Vector)的索引是整型的、从0开始的。而它的值为 Double 类型,存储于单个机器内。 MLlib 支持两种本地向量:稠密向量和稀疏向量
6.1.1.2本地向量的基类是 Vector 类,DenseVector 和 SparseVector 类是其两种不同的实现。官方文档推荐大家使用 Vector 类中已实现的工厂方法来创建本地向量
6.1.1.3安装spark(这里我们只需要单机模式的spark的环境即可),解压tgz下的spark安装包
[zkpk@master ~]$ cd tgz/spark [zkpk@master spark]$ tar -zxvf spark-2.1.1-bin-hadoop-2.7 -C ~/ [zkpk@master spark]$ cd
6.1.1.4打开spark shell终端
[zkpk@master ~]$ cd spark-2.1.1-bin-hadoop2.7/ [zkpk@master spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell
6.1.1.5进入spark shell命令行后,导入spark mllib中的本地向量相关包
scala>import org.apache.spark.mllib.linalg.{Vector, Vectors}
6.1.1.6利用dense方法创建稠密向量(1.0, 0.0, 3.0)
scala>val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
6.1.1.7通过指定非零实体对应的索引和值,来创建稀疏向量 (1.0, 0.0, 3.0)
scala>val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // 3个参数:第一个参数:vector大小;第二个indices:数组,用于保存vector中非零元素,且索引是递增的;第三个:vector非零值组成的数组,且与indices对应
6.1.1.8通过指定非零实体,来创建稀疏向量 (1.0, 0.0, 3.0)
scala>val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
6.1.1.9向量 (1.0, 0.0, 3.0) 写成稠密形式就是 [1.0, 0.0, 3.0],而写成稀疏形式则是(3, [0, 2], [1.0, 3.0]),后者的第一个 3 是指向量的大小。稀疏和稠密的界限没有严格意义上的标准,通常需要依据具体的问题来决定
6.1.1.10在Spark中,如果没有引入org.apache.spark.mllib.linalg.Vector 包的话,Vector 会被认为是scala.collection.immutable.Vector 中定义的那个。因此,应当引入前者以显式地使用 MLlib 中的向量
6.1.2标签点
6.1.2.1标签点(Labeled Point)是一个本地向量,也分稀疏或者稠密,并且是一个带有标签的本地向量。
6.1.2.2在 MLlib 中,标签点常用于监督学习类算法。标签(Label)是用 Double 类型存放的,因此标签点可以用于回归或者分类算法中。如果是二维分类,标签则必须是 0 或 1 之间的一种。而如果是多个维度的分类,标签应当是从 0 开始的数字,代表各个分类的索引。
6.1.2.3标签点是由一个名为 LabeledPoint的 Case Class 样例类定义的
6.1.2.4在spark-shell中创建标签点, 引入标签点相关的包
scala>import org.apache.spark.mllib.linalg.Vectors scala>import org.apache.spark.mllib.regression.LabeledPoint
6.1.2.5创建一个带有正面标签和稠密特征向量的标签点
scala>val lpd= LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
6.1.2.6创建一个带有负面标签和稀疏特征向量的标签点
scala>val lps= LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
6.1.3本地矩阵
6.1.3.1本地矩阵(Local matrix)的索引也是从0开始的,并且是整型。值也为 Double 类型,存储于单个机器内。 Mllib 支持两种本地矩阵:稠密矩阵和稀疏矩阵
6.1.3.2稠密矩阵的实体值以列为主序的形式,存放于单个 Double 型数组内。稀疏矩阵的非零实体以列为主序的形式,存放于压缩稀疏列(Compressed Sparse Column, CSC)中
6.1.3.3本地矩阵的基类是 Matrix 类,在 Spark 中有其两种实现,分别是 DenseMatrix 和 SparseMatrix 。官方文档中推荐使用已在 Matrices 类中实现的工厂方法来创建本地矩阵。需要注意的是,MLlib 中的本地矩阵是列主序的(column-major)
6.1.3.4在spark shell中创建本地矩阵,首先导入矩阵相关包
scala>import org.apache.spark.mllib.linalg.{Matrix, Matrices}
6.1.3.5创建稠密矩阵((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
scala>val md: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // 列主序的;如此例,第一列的值有3个,分别是1.0、3.0、5.0;另外三个值是第二列的值
6.1.3.6创建稀疏矩阵 ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
scala>val ms: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
6.2MLlib算法
6.2.1MLlib 的算法涵盖了基本统计、分类和回归和协同过滤等 9 个大类,而更加新颖和高效的算法也在不断地推陈出新。在 Spark中,主要的算法有:
6.2.1.1基本统计算法
汇总统计
相关统计
分层抽样
假设检验
随机数据生成
核密度估计
6.2.1.2分类和回归
6.2.1.3协同过滤
6.2.1.4聚类
6.2.1.5降维
6.2.1.6特征提取和转化
6.2.1.7频繁项挖掘
6.2.1.8评价指标
6.2.1.9PMML导出
6.2.2由于篇幅有限且算法内容较复杂,这里就不一一讲解,本实验介绍使用一种交替最小二乘法的协同过滤算法
6.3实例:利用算法来推荐电影
6.3.1利用**:quit**命令退出spark shell命令行,解压experiment/mllib目录下的数据集文件
scala>:quit [zkpk@master spark-2.1.1-bin-hadoop2.7]$ cd ~/experiment/mllib [zkpk@master mllib]$ unzip ml-1m.zip [zkpk@master mllib]$ cd ml-1m
6.3.2解压完成之后有三个dat文件,分别是movies.dat、ratings.data、users.dat
6.3.3我们查看下用户信息users.dat文件的前10条,每行的数据格式为`用户ID::性别::年龄::工作::邮编`
[zkpk@master ml-1m]$ head -10 users.dat
6.3.3.1性别部分:M代表男性,F代表女性;
6.3.3.2年龄部分:1表示18岁以下,18表示18-24岁,25表示25-34岁,35表示35-44岁,45表示45-49岁,50表示50-55岁,56表示56岁及以上
6.3.3.3职业部分:职业编号分布在0-20,每一种编号分别表示一种职业,若想查看详情,打开README文件,找到对users.dat的描述的内容
6.3.4我们查看下评论信息ratings.dat文件的前10条,其格式为`用户ID::电影ID::评论星级::时间戳`
[zkpk@master ml-1m]$ head -10 ratings.dat
6.3.4.1电影ID部分:电影ID的范围在1-3952之间,每一个ID代表一部电影
6.3.4.2评论星级部分:共五星用1-5的数字表示
6.3.4.3时间戳部分:评论发布的时间戳
6.3.5我们查看下电影信息movies.dat文件的前10条,其格式为`电影ID::电影标题::电影流派`
[zkpk@master ml-1m]$ head -10 movies.dat
6.3.5.1电影标题部分:电影的发布名称和发布年份等
6.3.5.2电影流派部分:共18个流派,一部电影可能属于一个或者多个流派,各流派如下:Action(行动)、Adventure(冒险)、Animation(动画)、Children’s(儿童)、Comedy(喜剧)、Crime(犯罪)、documentary(纪录片)、Drama(戏剧)、Fantasy(幻想)、Film-Noir(黑色电影)、Horror(恐怖)、Musical(音乐)、Mystery(神秘)、Romance(浪漫)、Sci-Fi(科幻)、Thriller(惊悚)、War(战争)、Western(西方)
6.3.6重新进入Spark shell
[zkpk@master ml-1m]$ cd ~/spark-2.1.1-bin-hadoop2.7/ [zkpk@master spark-2.1.1-bin-hadoop2.7]$ bin/spark-shell
6.3.7开始编写实验代码
6.3.7.1引入本节实验要使用的相关spark包
scala>import spark.implicits._ scala>import org.apache.spark.rdd._ scala>import org.apache.spark.sql._ scala>import org.apache.spark.mllib.recommendation.Rating scala>import org.apache.spark.mllib.recommendation.ALS scala>import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
6.3.7.2首先对电影数据建立一个 Case Class,它对应了 movies.dat 文件中的部分字段(其中 Genres(流派)字段不是我们所需要的,因此在 Case Class 中没有设置对应的成员变量)
scala>case class Movie(movieId: Int, title: String)
6.3.7.3同样的,我们为用户数据建立一个 Case Class,它对应了 users.dat 文件中的部分字段,(本实验只使用了 用户ID 作为运算时的输入)
scala>case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
6.3.7.4我们不需定义ratings.dat数据的Case Class,因为评价类型的数据在spark机器学习框架中已经有org.apache.spark.mllib.recommendation.Rating类来做映射,在一开始的导包中我们已经导入了该包
6.3.7.5定义三个解析数据函数。这些函数将用于解析数据集中的每一行内容,去掉分隔符 ::,然后把数据映射到 Case Class 的对应成员中,因此,针对三种数据,分别编写对应解析函数
// 解析 movies.dat 文件的函数;参数为此文件中的一行内容 scala> def parseMovieData(data: String): Movie = { val dataField = data.split("::") assert(dataField.size == 3) Movie(dataField(0).toInt, dataField(1)) } // 解析 users.dat 文件的函数;参数为此文件中的一行内容 scala> def parseUserData(data: String): User = { val dataField = data.split("::") assert(dataField.size == 5) User(dataField(0).toInt, dataField(1), dataField(2).toInt, dataField(3).toInt, dataField(4)) } // 解析 ratings.dat 文件的函数,这里用到的 Rating 类是在org.apache.spark.mllib.recommendation.Rating 包中定义的 scala> def parseRatingData(data: String): Rating = { val dataField = data.split("::") Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble) }
6.3.8定义好三种数据对应的解析函数后,下面我们将他们导入到 RDD 中
scala>val moviesData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/movies.dat").map(parseMovieData).cache() //cache()将RDD缓存起来 scala>val usersData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/users.dat").map(parseUserData).cache() scala>val ratingsData = sc.textFile("/home/zkpk/experiment/mllib/ml-1m/ratings.dat").map(parseRatingData).cache()
6.3.9通过一些方法来查看数据的某些特征
6.3.9.1看一下评价数据到底有多少个
scala>val amountOfRatings = ratingsData.count()
6.3.9.2看一下多有少个用户评价了电影
scala>val amountOfUsers = ratingsData.map(_.user).distinct().count() //ratingsData是一个RDD,其元素类型是Rating;_.user中_代表RDD中的一个个Rating对象,然后取出此对象的user属性;再去重;计数
6.3.10将这些 RDD 转化为 Dataframe ,用于后续的 *** 作
scala>val moviesDF = moviesData.toDF() scala>val usersDF = usersData.toDF() scala>val ratingsDF = ratingsData.toDF()
6.3.11将这几个 Dataframe 注册为临时表
scala>moviesDF.registerTempTable("movies") scala>usersDF.registerTempTable("users") scala>ratingsDF.registerTempTable("ratings")
6.3.12利用一个 SQL 查询,获取id是1680的用户的评价高于 4.5 分的电影有哪些
scala>val highRatingMovies = spark.sql("SELECt ratings.user, ratings.product, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.product WHERe ratings.user=1680 and ratings.rating > 4.5 ORDER BY ratings.rating DESC ")
6.3.13检索完成后,调用 show() 函数查看前20条结果,结果格式为:用户ID,电影ID,用户评分,电影标题(每次实验结果可能略有不同)
scala>highRatingMovies.show()
6.3.14开始机器学习,将已有的数据划分为两部分:训练集、测试集
6.3.15继续在spark shell中编程,将评分数据集按照训练集70%,测试集30%来随机切分,250L是随机种子数
scala>val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 250L)
6.3.15.1这里用到了 randomSplit 函数,两个参数分别为划分比例、产生随机数的种子值
6.3.16然后我们将划分的结果存放在两个不同的变量中:
scala>val trainingSetOfRatingsData = tempPartitions(0).cache() // 训练集 scala>val testSetOfRatingData = tempPartitions(1).cache() // 测试集
6.3.17将训练集用于 ALS 算法
scala>val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
6.3.17.1MLlib 中的 ALS 算法需要设置三个参数,分别是特征矩阵的秩数量 setRank、迭代次数 setIterations 和训练用的训练数据集。其中特征矩阵的秩设置为 20,表明生成的 userFeatures 和 itemFeatures 的个数是20
6.3.18生成推荐结果,通过 MatrixFactorizationModel 类的 recommendProducts 方法,我们可以对某个用户生成推荐结果(每次实验结果可能略有不同)
scala>val recomResult = recomModel.recommendProducts(1234, 10) // 1234是userid
6.3.18.11234 是我们随意指定的一个用户 ID,做实验时也可以修改成其他用户的 ID 。10 是产生推荐的数量
6.3.19也可以利用mkString展现更直观的推荐结果
scala>println(recomResult.mkString("n"))
6.3.19.1输出数据的格式是 Rating 类的格式:用户ID、电影ID和该用户对电影可能的评分。
6.3.20但仅看到电影的 ID,并不知道具体是什么电影。所以还需要按照电影的 ID找到对应的电影标题:
scala>val movieTitles = moviesDF.rdd.map(array => (array(0), array(1))).collectAsMap() // 0、1分别是movieId、title scala>val recomResultWithTitle = recomResult.map(rating => (movieTitles(rating.product), rating.rating))
6.3.21然后再把结果输出,就可以看到推荐的电影名称了:
scala>println(recomResultWithTitle.mkString("n"))
6.3.22模型评价,评价的方式便是将上边得到的推荐结果(评分预测值)与测试集中实际结果相比较。利用 MatrixFactorizationModel 类的 predict 函数来得到预测的评分值
scala>val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map { case Rating(user, product, rating) => (user, product) })
6.3.23得到对测试集的预测评分结果后,用 map 、 join算子将它与测试集的原始数据组合成 **((用户ID, 电影ID), (测试集原有评分, 预测评分))**的数据结构。这个格式是 Key-Value 形式的,Key 为 (user, product)。我们是要把这里的测试集原有评分与预测时得到的评分相比较,二者的联系就是 user 和 product 相同。第一个 map 可以将 (用户ID, 电影ID) 与 测试集原有评分 组合成 KV 格式;第二个 map *** 作,可以将 (用户ID, 电影ID) 与 预测评分 组合成 KV 格式
// 测试集 scala>val formatResultOfTestSet = testSetOfRatingData.map { case Rating(user, product, rating) => ((user, product), rating) } // 预测结果 scala>val formatResultOfPredictionResult = predictResultOfTestSet.map { case Rating(user, product, rating) => ((user, product), rating) }
6.3.24然后利用join算子将二者连接起来,Key 相同的不同值就整合在一起形成了 ((用户ID, 电影ID), (测试集原有评分, 预测评分)) 的格式。
scala>val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
6.3.25最后利用这个结果来计算预测评分和实际评分之间的平均绝对误差。平均绝对误差(Mean Absolute Error)是所有单个观测值与算术平均值偏差的绝对值的平均。与平均误差相比,平均绝对误差由于被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。我们直接取finalResultForComparison 结果中 ratingOfTest 和 ratingOfPrediction两个值,先算误差,再取绝对值。最后对所有的绝对值计算平均数
scala>val MAE = finalResultForComparison.map { case ((user, product), (ratingOfTest, ratingOfPrediction)) => {val deviation= (ratingOfTest - ratingOfPrediction) Math.abs(deviation)} }.mean()
6.3.26查看终端结果(每次实验结果可能略有不同),可以看到这个误差值是在可以接受的范围内的
算术平均值偏差的绝对值的平均。与平均误差相比,平均绝对误差由于被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。我们直接取finalResultForComparison 结果中 ratingOfTest 和 ratingOfPrediction两个值,先算误差,再取绝对值。最后对所有的绝对值计算平均数
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)