Spark Machine Learning的数据类型及使用示例代码

Spark Machine Learning的数据类型及使用示例代码,第1张

Spark Machine Learning的数据类型及使用示例代码 Spark ML的数据类型 Local vector

本地向量具有整数类型和基于0的索引和double类型的值,存储在一台机器上。
MLlib支持两种类型的本地向量

  • 稠密本地向量 dense local vector
  • 稀疏本地向量 sparse local vector
	import org.apache.spark.mllib.linalg.{Vector, Vectors}
    ## 创建稠密向量
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
    
    ## 基于指定对应于非零项的索引和值创建稀疏向量
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
Labeled point

标记点是一个与标记/响应相关的本地向量,可以是密集的,也可以是稀疏的。在MLlib中,标记点用于监督学习算法。
稀疏训练数据在实际的机器学习工作中是很常见的。
MLlib支持LIBSVM格式,这是LIBSVM、LIBLINEAR的默认格式

数据格式如下:

label index1:value1 index2:value2 ...

LIBSVM 和 LIBLINEAR的区别是:

  • LIBSVM是一套完整的SVM模型实现。用户可以在LIBSVM中使用核函数来训练非线性的分类器,当然也能使用更基础的线性SVM。
  • LIBLINEAR是一个针对线性分类场景而设计的工具包,除了支持线性SVM外,还支持线性的Logistic Regression等模型,但是无法通过定义核函数方式实现非线性的分类器
  • 在进行线性分类时LIBSVM和LIBLINEAR都可以达到类似的结果,但是LIBLINEAR无论是在训练上还是在预测上,都比LIBSVM高效得多,因为LIBLINEAR本身就是为了解决较大规模样本的模型训练而设计的
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
Local matrix

本地矩阵具有整型的行和列索引以及double类型的值,存储在一台机器上。
MLlib支持密集矩阵和稀疏矩阵

  • 密集矩阵的项值以列主顺序存储在单个双数组中,
  • 稀疏矩阵的非零项值以列主顺序存储在压缩稀疏列(CSC)格式中

    Compressed Sparse Column Format (CSC)的目的:为了压缩矩阵,减少矩阵存储所占用的空间。
    实现思路:通过增加一些"元信息"来描述矩阵中的非零元素存储的位置(基于列),然后结合非零元素的值来表示矩阵
    CSC使用三个数组来构稀疏矩阵:
  • 第一个数组:
    • 元素个数就是(矩阵的列数+1)
    • 第一个元素一直是0。第二个元素是第一列的非零元素的数量
    • 后续的值为前一个值 + 下一列非零元素的数量
  • 第二个数组:表示的是每一列,非零元素所在的行号,行号从0开始【注意,针对同一列,行号顺序没有要求必须从上到下】,
  • 第三个数组:所有非零元素的值(基于列序,即循环列)
import org.apache.spark.mllib.linalg.{Matrix, Matrices}

val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

CSC示例如下:

Distributed matrix

分布式矩阵具有long类型的行和列索引以及double类型值,分布存储在一个或多个rdd中。

选择合适的格式来存储大型分布式矩阵是非常重要的,将分布式矩阵转换为另一种格式可能需要全局shuffle,这是非常昂贵的。

目前已经实现了四种类型的分布式矩阵

  • RowMatrix
  • IndexedRowMatrix
  • CoordinateMatrix
  • BlockMatrix
    注意:分布式矩阵的基本rdd必须是确定性的,因为缓存了矩阵的大小。通常,使用非确定性的rdd会导致错误
RowMatrix

RowMatrix是一个面向行但没有行索引的分布式矩阵,其中每一行是一个local vector。由于每一行都由一个local vector表示,所以列的数量受整数范围的限制,但实际上应该更小。

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val v0=Vectors.dense(1.0,0.0,3.0)
val v1=Vectors.dense(3.0,2.0,4.0)

val rows =sc.parallelize(Seq(v0,v1))
val mat: RowMatrix = new RowMatrix(rows)

//向量运算 2行 3 列:
val m = mat.numRows()
val n = mat.numCols()
mat.rows.foreach(println)

//通过computeColumnSummaryStatistics()方法获取统计摘要
val summary = mat.computeColumnSummaryStatistics()
print(summary.max)
//[3.0,2.0,4.0]

//以通过summary实例来获取矩阵的相关统计信息,例如行数 2
print(summary.count)
//方差向量[2.0,2.0,0.5]
print(summary.variance)
//平均向量[2.0,1.0,3.5]
print(summary.mean)
//L1范数向量[4.0,2.0,7.0]
print(summary.normL1)

IndexedRowMatrix

IndexedRowMatrix类似于RowMatrix,但具有有意义的行索引。每一行都由它的索引(长类型)和一个local vector表示。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors

val idxr1 = IndexedRow(1,Vectors.dense(1.0,0.0,3.0))
val idxr2 = IndexedRow(2,Vectors.dense(3.0,2.0,4.0))
val idxr3 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0))
val idxr4 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0))
val idxr5 = IndexedRow(3,Vectors.dense(3.0,2.0,4.0))
val idxrows = sc.parallelize(Array(idxr1,idxr2,idxr3,idxr4,idxr5))
val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
idxmat.rows.foreach(println)
//打印行数 3
print(idxmat.columnSimilarities().numRows)
//打印行数 4 是索引的最大值+1, 不是指行数
print(idxmat.numRows)
CoordinateMatrix

坐标矩阵CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。
每一个矩阵项MatrixEntry都是一个三元组:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。

坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。
CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowIndex, colIndex, elem)的三元组

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries = sc.parallelize(Array(new MatrixEntry(0,1,0.5),new MatrixEntry(2,2,1.8)))
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
coordMat.entries.foreach(println)
//MatrixEntry(0,1,0.5)
//MatrixEntry(2,2,1.8)

//将coordMat进行转置
val transMat: CoordinateMatrix = coordMat.transpose()
transMat.entries.foreach(println)
//MatrixEntry(1,0,0.5)
//MatrixEntry(2,2,1.8)
//将坐标矩阵转换成一个索引行矩阵
val indexedRowMatrix = transMat.toIndexedRowMatrix()
indexedRowMatrix.rows.foreach(println)
//IndexedRow(2,(3,[2],[1.8]))
//IndexedRow(1,(3,[0],[0.5]))

BlockMatrix

分块矩阵是基于矩阵块MatrixBlock构成的RDD的分布式矩阵,
其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。

分块矩阵支持和另一个分块矩阵进行加法 *** 作和乘法 *** 作,并提供了一个支持方法validate()来确认分块矩阵是否创建成功。

分块矩阵可由索引行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换,该方法将矩阵划分成尺寸默认为1024×1024的分块,可以在调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法时传入参数来调整分块的尺寸。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
val ent1 = new MatrixEntry(0,0,1)
val ent2 = new MatrixEntry(1,1,1)
val ent3 = new MatrixEntry(2,0,-1)
val ent4 = new MatrixEntry(2,1,2)
val ent5 = new MatrixEntry(2,2,1)
val ent6 = new MatrixEntry(3,0,1)
val ent7 = new MatrixEntry(3,1,1)
val ent8 = new MatrixEntry(3,3,1)
//创建RDD[MatrixEntry]
val entries = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
//将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()
//可以用validate()方法判断是否分块成功
matA.validate()
matA.toLocalMatrix
//    1.0   0.0  0.0  0.0
//    0.0   1.0  0.0  0.0
//    -1.0  2.0  1.0  0.0
//    1.0   1.0  0.0  1.0
    
    
// 查看其分块情况 2 2
matA.numColBlocks
matA.numRowBlocks

// 计算矩阵A和其转置矩阵的积矩阵
matA.transpose.multiply(matA).toLocalMatrix
//  res4: org.apache.spark.mllib.linalg.Matrix =
//	3.0   -1.0  -1.0  1.0
//	-1.0  6.0   2.0   1.0
//	-1.0  2.0   1.0   0.0
//	1.0   1.0   0.0   1.0

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5117987.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存