1.从内存集合中创建RDD
- 从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.parallelize( List(1,2,3,4) ) val rdd2 = sparkContext.makeRDD( List(1,2,3,4) ) rdd1.collect().foreach(println) rdd2.collect().foreach(println) sparkContext.stop()
-
local[*]表示用机器的CPU核数模拟多线程执行计算任务,local表示用单核
-
并行和并发:当电脑有两个CPU核的时候,并行度就是2,如果有4个task,那么并行度也只能是2,剩下的就是并发执行了
-
从底层代码实现来讲,makeRDD方法其实就是parallelize方法
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }
2.从内存中创建RDD的并行度
(1)RDD并行度
- 默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。
- 这个数量可以在构建RDD时指定。记住,并行度 = 并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
- 三个分区,对应三个Task,但是只有一个Executor,单核,那么无法并行执行,只能并发执行,所以分区和并行度不能划等号。
(2)makeRDD创建RDD分区的数量
- makeRDD方法第二个参数表示分区的个数,不传递参数那么会使用默认参数defaultParallism,Spark会从配置对象SparkConf中获取配置参数:spark.defaultParallism。如果获取不到,那么使用totalCores属性,此属性取值为当前Spark运行环境的最大可用核数。这里是local,所以是本地机器的CPU个数。
1.默认:Spark运行环境的core个数
2.手动指定:在SparkConf对象中配置核数:sparkConf.set(“spark.defaul.parallelism”,”5”)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val dataRDD: RDD[Int] = sparkContext.makeRDD( List(1,2,3,4), 4) val fileRDD: RDD[String] = sparkContext.textFile( "input", 2) fileRDD.collect().foreach(println) sparkContext.stop()
3. 从内存中创建RDD分区数据的分配
从内存中创建RDD,数据是如何分配到各个分区中的
数据分区规则的Spark核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } }
Length=5,nums=3, i = 0 1 2
i=0=>[0,1) =>1
i=1=>[1,3) =>2,3
i=2=>[3,5) => 4,5
4.从外部存储(文件)创建RDD
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、Hbase等。
- textFile()中的路径默认以当前环境的根路径为基准,可以写相对路径也可以写绝对路径,而且也可以写目录路径,读取目录下的多个文件
- Path也可以写通配符 textFile(“datas/1*.txt”)
- Path也可以是分布式存储系统路径:HDFS: hdfs://hadoop102:8020/text.txt
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()
5.从外部存储(文件)创建RDD-2
- wholeTextFile(path)方法:path和textFile方法一样。
- textFile()以行为单位来读取数据,wholeTextFile以文件为单位读取数据,读取到的都是字符串
- 读取的结果是一个tuple,第一个元素为文件路径,第二个元素为文件内容
Hello world 后面有换行,所以输出的这里也有换行
6. 从文件中创建RDD分区个数的计算规则
从文件创建RDD使用的是Hadoop中的TextFileInputStream,切片规则是一样的。
(1) 默认分区数计量: math.min(defaultParallelism,2)
即运行环境的CPU核数和2的最小值
(2) 指定分区数量:
a) totalSize: 目录下所有文件或者文件的总共字节数
b) goalSize: 每个分区应该存储的字节数。totalSize除以设定的分区数
c) splitSize : Math.max(minSize, Math.min(goalSize, blockSize));
d) 对文件按照splitSize进行切片,当剩余的文件字节数<1.1* splitSize,就不切了。
举例说明:7字节的文件,指定2个分区,实际上得到的是3个分区 totalSize= 7 goalSize=7/2=3 : 最终3个分区的大小分别为:3 3 1
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); ... for (FileStatus file: files) { ... //判断是否切片 if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); ... } protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
7.从文件中创建RDD分区数据的分配规则
(1) 分区是按行获取文件中的数据
(2) 每个分区获取哪些行呢?
根据偏移量的范围确定获取哪些行。第一个字节的偏移量为0,一个字节对应1个单位的偏移量。
- 0号分区偏移量范围:[0,0+goalSize],0号分区获取偏移量0和偏移量goalSize所在的行的全部数据。
- 1号分区偏移量范围:[goalSize,goalSize+goalSize],1号分区获取偏移量goalSize所在行的下一行到2*goalSize所在行的全部数据。
(3) 如果数据源是多个文件,那么是以文件为单位进行分区的。(以文件为单位切片)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)