如今是大数据的时代,数据呈指数型增长,那么如何利用这些数据?离不开大数据计算,今天小普给大家介绍的是:Spark的分布式计算框架,它能很好地适配大数据场景下的计算任务。
【相似度计算】是金融领域或商品推荐等领域的常见需求,如果需要计算M个用户两两之间的相似度情况,若用户特征个数为N。
如果采用循环遍历的计算方式,我们需要计算M*(M-1)次才能得到两两用户之间的相似度情况。
而如果我们采用矩阵计算的方式,只需构造一个M*N维的矩阵A,
做一些简单的矩阵运算:
运算的结果矩阵元素的下标,就对应这用户与用户之间的欧式距离情况,如元素A32, 便是用户3和用户2的欧式距离。在数据量较大的情况,相较于循环遍历的方法,矩阵运算可以节省很多时间。
然而,大数据场景下,数据量经常会变得非常极端,如:
要计算1亿个用户两两之间的相似度,哪怕用户的特征维度仅为5,也需要构造一个100,000,000*5 的矩阵。而矩阵计算过程中的中间矩阵,如
A*AT
会得到100,000,000*100,000,000 的中间矩阵
如果直接进行内存运算的话,大概需要9094Tb 的内存,势必会导致内存溢出。
Spark 的分布式矩阵为上述大数据场景提供了一种可行的解决方案,在介绍分布式矩阵之前,需要先介绍Spark.mllib中的矩阵特质。
Spark的spark.mlib.linalg.distributed所使用的分布式矩阵,使得开发人员可以根据不同的场景、不同的计算需求,将用于计算的矩阵进行拆分,利用map-reduce的思想将整块矩阵的计算map成子矩阵的 *** 作,从而得以在一个矩阵的计算步中,矩阵各部分在不同的计算单元上同步进行,最终将结果reduce汇总,从而得到计算结果。
在介绍分布式矩阵之前,需要先介绍Spark的本地矩阵
LocalMatrix
其中本地矩阵主要可以分为DenseMatrix(稠密矩阵)与SparseMatrix(稀疏矩阵)
DenseMatrix(稠密矩阵)
mlib中存储稠密矩阵的形式只有两种,都需要给定行数和列数,以及数值数组,而后分为按列主序存储(默认)和按行主序存储
图引用自csdn
稠密矩阵就是我们平时常见的矩阵储存方式,通常我们把每一个样本的特征,以行向量的形式存储在矩阵之中。
SparseMatrix(稀疏矩阵)
稀疏矩阵的概念主要是为了区分与稠密矩阵,在现实场景中,由于数据本身的特质(抑或数据缺失等各种原因),会存在矩阵中含0元素非常多的情况。
这时,采用SparseMatrix,仅存储矩阵中的非0元素,便能用较少的空间描述矩阵的本质。
mlib中的SparseMatrix稀疏矩阵以列起始号、行号、元素数值三个数组进行存储,具体的构造方法在之后的内容中将体现,我们先以一张示意图看一下mlib是如何以稀疏矩阵的形式存储矩阵的:
图引用自csdn
DistributedMatrix
spark.mlib.linalg.distributed 中主要包含四种矩阵存储形式类,分别是RowMatrix类、IndexedRowMatrix类、CoordinateMatrix类和BlockMatrix类。下面
RowMatrix类与IndexedRowMatrix类
RowMatrix类与IndexedRowMatrix类相似,矩阵存储时都需要给定行数和列数,以及数值数组,而后分为按列主序存储(默认)和按行主序存储。
但与本地的DenseMatrix不同的是,RowMatrix与IndexedRowMatrix会把一整个完整地矩阵按行进行切分,把一个矩阵分布式地存储与若干个机器之中。
但是,当样本的特征维度过于巨大时(如:行向量Va=(x1,x2,x3...x1,000,000,000)1*1,000,000,000),RowMatrix与IndexedRowMatrix存储形式仍然会有内存溢出的风险。
CoordinateMatrix类
Spark中CoordinateMatrix类的存储方式与本地SparseMatrix的存数形式类似,以始号、行号、元素数值三个数组进行存储,不同的是COordinateMatrix会对稀疏矩阵进行分布式存储。
由于CoordinateMatrix是以三元组的形式进行数据存储,因此,相较于RowMatrix,不容易出现内存溢出的情况。
BlockMatrix类
BlockMatrix是分布式矩阵存储中最常用的类型,它将矩阵按行列分块存储。基于BlockMatrix,
借用BlockMatrix,Spark可以把一个巨大的矩阵拆分为若干个子矩阵。
在矩阵乘法A x B中,分块矩阵乘法结果矩阵中每个子矩阵是A的一个行分块与B的一个列分块相乘得到的结果,而在Spark的mlib中,两个矩阵乘法所产生的task数正是这行列相乘的次数,即将每一个行列相乘在一个计算单元上处理,如果A的行列分块数是m x n,B的行列分块数是n x k,则task的个数,也就是并行度为m x k,当引入 numMidDimSplits 后并行度增加为m x k x numMidDimSplits,实则对A的行子矩阵与B的列子矩阵进行矩阵乘时,再按 numMidDimSplits 的个数对A的行子矩阵和B的列子矩阵进行拆分,从而将每一个A的行子矩阵和B的列子矩阵的乘法拆分成 numMidDimSplits 个矩阵乘法,通过对A的列索引对 numMidDimSplits 取余和B的行索引对 numMidDimSplits 取余可以得到每个子矩阵在再次拆分后的每个计算子块中的下标,实现对矩阵乘法运算更大的并行加速。如下图所示:
通过调整 numMidDimSplits的值对分布式并行矩阵乘法再次分块并行,从而使得并行度增加。
灵活应用Spark的LocalMatrix和DistributedMatrix,可以有避免大数据运算中容易面临内存溢出、运算过慢的情况。
感谢你看到这里,如果你想学习技术干货,收藏栏目不迷路~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)