本地向量具有整数类型和基于0的索引和double类型的值,存储在一台机器上。
MLlib支持两种类型的本地向量
- 稠密本地向量 dense local vector
- 稀疏本地向量 sparse local vector
import org.apache.spark.mllib.linalg.{Vector, Vectors} ## 创建稠密向量 val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) ## 基于指定对应于非零项的索引和值创建稀疏向量 val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))Labeled point
标记点是一个与标记/响应相关的本地向量,可以是密集的,也可以是稀疏的。在MLlib中,标记点用于监督学习算法。
稀疏训练数据在实际的机器学习工作中是很常见的。
MLlib支持LIBSVM格式,这是LIBSVM、LIBLINEAR的默认格式
数据格式如下:
label index1:value1 index2:value2 ...
LIBSVM 和 LIBLINEAR的区别是:
- LIBSVM是一套完整的SVM模型实现。用户可以在LIBSVM中使用核函数来训练非线性的分类器,当然也能使用更基础的线性SVM。
- LIBLINEAR是一个针对线性分类场景而设计的工具包,除了支持线性SVM外,还支持线性的Logistic Regression等模型,但是无法通过定义核函数方式实现非线性的分类器
- 在进行线性分类时LIBSVM和LIBLINEAR都可以达到类似的结果,但是LIBLINEAR无论是在训练上还是在预测上,都比LIBSVM高效得多,因为LIBLINEAR本身就是为了解决较大规模样本的模型训练而设计的
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")Local matrix
本地矩阵具有整型的行和列索引以及double类型的值,存储在一台机器上。
MLlib支持密集矩阵和稀疏矩阵
- 密集矩阵的项值以列主顺序存储在单个双数组中,
- 稀疏矩阵的非零项值以列主顺序存储在压缩稀疏列(CSC)格式中
Compressed Sparse Column Format (CSC)的目的:为了压缩矩阵,减少矩阵存储所占用的空间。
实现思路:通过增加一些"元信息"来描述矩阵中的非零元素存储的位置(基于列),然后结合非零元素的值来表示矩阵
CSC使用三个数组来构稀疏矩阵: - 第一个数组:
- 元素个数就是(矩阵的列数+1)
- 第一个元素一直是0。第二个元素是第一列的非零元素的数量
- 后续的值为前一个值 + 下一列非零元素的数量
- 第二个数组:表示的是每一列,非零元素所在的行号,行号从0开始【注意,针对同一列,行号顺序没有要求必须从上到下】,
- 第三个数组:所有非零元素的值(基于列序,即循环列)
import org.apache.spark.mllib.linalg.{Matrix, Matrices} val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
CSC示例如下:
分布式矩阵具有long类型的行和列索引以及double类型值,分布存储在一个或多个rdd中。
选择合适的格式来存储大型分布式矩阵是非常重要的,将分布式矩阵转换为另一种格式可能需要全局shuffle,这是非常昂贵的。
目前已经实现了四种类型的分布式矩阵
- RowMatrix
- IndexedRowMatrix
- CoordinateMatrix
- BlockMatrix
注意:分布式矩阵的基本rdd必须是确定性的,因为缓存了矩阵的大小。通常,使用非确定性的rdd会导致错误
RowMatrix是一个面向行但没有行索引的分布式矩阵,其中每一行是一个local vector。由于每一行都由一个local vector表示,所以列的数量受整数范围的限制,但实际上应该更小。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val v0=Vectors.dense(1.0,0.0,3.0) val v1=Vectors.dense(3.0,2.0,4.0) val rows =sc.parallelize(Seq(v0,v1)) val mat: RowMatrix = new RowMatrix(rows) //向量运算 2行 3 列: val m = mat.numRows() val n = mat.numCols() mat.rows.foreach(println) //通过computeColumnSummaryStatistics()方法获取统计摘要 val summary = mat.computeColumnSummaryStatistics() print(summary.max) //[3.0,2.0,4.0] //以通过summary实例来获取矩阵的相关统计信息,例如行数 2 print(summary.count) //方差向量[2.0,2.0,0.5] print(summary.variance) //平均向量[2.0,1.0,3.5] print(summary.mean) //L1范数向量[4.0,2.0,7.0] print(summary.normL1)IndexedRowMatrix
IndexedRowMatrix类似于RowMatrix,但具有有意义的行索引。每一行都由它的索引(长类型)和一个local vector表示。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors val idxr1 = IndexedRow(1,Vectors.dense(1.0,0.0,3.0)) val idxr2 = IndexedRow(2,Vectors.dense(3.0,2.0,4.0)) val idxr3 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0)) val idxr4 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0)) val idxr5 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0)) val idxrows = sc.parallelize(Array(idxr1,idxr2,idxr3,idxr4,idxr5)) val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows) idxmat.rows.foreach(println) //打印行数 3 print(idxmat.columnSimilarities().numRows) //打印行数 4 是索引的最大值+1, 不是指行数 print(idxmat.numRows)CoordinateMatrix
坐标矩阵CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。
每一个矩阵项MatrixEntry都是一个三元组:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。
坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。
CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowIndex, colIndex, elem)的三元组
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries = sc.parallelize(Array(new MatrixEntry(0,1,0.5),new MatrixEntry(2,2,1.8))) val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) coordMat.entries.foreach(println) //MatrixEntry(0,1,0.5) //MatrixEntry(2,2,1.8) //将coordMat进行转置 val transMat: CoordinateMatrix = coordMat.transpose() transMat.entries.foreach(println) //MatrixEntry(1,0,0.5) //MatrixEntry(2,2,1.8) //将坐标矩阵转换成一个索引行矩阵 val indexedRowMatrix = transMat.toIndexedRowMatrix() indexedRowMatrix.rows.foreach(println) //IndexedRow(2,(3,[2],[1.8])) //IndexedRow(1,(3,[0],[0.5]))BlockMatrix
分块矩阵是基于矩阵块MatrixBlock构成的RDD的分布式矩阵,
其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。
分块矩阵支持和另一个分块矩阵进行加法 *** 作和乘法 *** 作,并提供了一个支持方法validate()来确认分块矩阵是否创建成功。
分块矩阵可由索引行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换,该方法将矩阵划分成尺寸默认为1024×1024的分块,可以在调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法时传入参数来调整分块的尺寸。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} import org.apache.spark.mllib.linalg.distributed.BlockMatrix val ent1 = new MatrixEntry(0,0,1) val ent2 = new MatrixEntry(1,1,1) val ent3 = new MatrixEntry(2,0,-1) val ent4 = new MatrixEntry(2,1,2) val ent5 = new MatrixEntry(2,2,1) val ent6 = new MatrixEntry(3,0,1) val ent7 = new MatrixEntry(3,1,1) val ent8 = new MatrixEntry(3,3,1) //创建RDD[MatrixEntry] val entries = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8)) val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) //将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入 val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache() //可以用validate()方法判断是否分块成功 matA.validate() matA.toLocalMatrix // 1.0 0.0 0.0 0.0 // 0.0 1.0 0.0 0.0 // -1.0 2.0 1.0 0.0 // 1.0 1.0 0.0 1.0 // 查看其分块情况 2 2 matA.numColBlocks matA.numRowBlocks // 计算矩阵A和其转置矩阵的积矩阵 matA.transpose.multiply(matA).toLocalMatrix // res4: org.apache.spark.mllib.linalg.Matrix = // 3.0 -1.0 -1.0 1.0 // -1.0 6.0 2.0 1.0 // -1.0 2.0 1.0 0.0 // 1.0 1.0 0.0 1.0
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)