Spark-核心数据集RDD(六)

Spark-核心数据集RDD(六),第1张

Spark-核心数据集RDD(六)
SparkCore-核心数据集RDD

  今天真是美好的一天啊,那我们开始吧,我们今天讲一下RDD,为什么要将RDD了,先说一下我,作为一枚标准的理工男,如果没有彻底弄明白一个东西,就去实 *** ,那肯定是一脸懵逼的,即使瞎一道题目猫碰上死耗子,暂时有了正确结果,但是题目文件类型一变,那又将是懵逼树上懵逼果,懵逼树下你和我。还记得高中化学,1mol水分子=2mol氢原子+1mol氧原子,没有弄明白mol的我,为什么2+1=1???


  好吧 就这样。因为初步我们的数据集都将是一个一个的RDD(这里我的理解是RDD是一个模板,会随数据复刻出许许多多的RDD),但是不明白RDD,我们很难讲一个RDD转化为另一个RDD。在SparkCore中,数据处理就是RDD之间的互相转换。
在Spark源码中对于RDD的描述是这样的:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; DoubleRDDFunctions contains operations available only on RDDs of Doubles; and SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
Internally, each RDD is characterized by five main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. Please refer to the Spark paper  for more details on RDD internals.

说人话就是:Resilient Distributed Dataset (RDD)就是d性分布式数据集。
它含有五大属性:

  1. A list of partitions
  2. A function for computing each split
  3. A list of dependencies on other RDDs
  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

好吧,我不是人。

1、RDD的理解

RDD了。我们可以将 RDD 理解为一个分布式对象集合(数据模板),本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算
阁下请看图:

我们就好比RDD是一个长度为4的二维列表,四个小列表分别存储在不同的集群节点上(这仅代表我不成熟的想法)。
RDD 具有容错机制,并且只读不能修改,可以执行确定的转换 *** 作创建新的 RDD。具体来讲,RDD 具有以下几个属性。

只读:不能修改,只能通过转换 *** 作生成新的 RDD。
分布式:可以分布在多台机器上进行并行处理。
d性:计算过程中内存不够时它会和磁盘进行数据交换。
基于内存:可以全部或部分缓存在内存中,在多次计算间重用。


2、RDD的属性

  我们又要回到官方的解释了,别怕哈,这次我们来翻译。

  1. A list of partitions
    即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

  2. A function for computing each split
    一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

  3. A list of dependencies on other RDDs
    RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

这样的解释不知道大家有没有一点理解哈,*** 这里说一下,上面的翻译是我学习其他博主的:解释来源。

现在我说一下我自己的浅显理解。

2.1、A list of partitions

  一个分区列表,RDD是一个分布式的数据模板(我一直不愿意称之为数据集,因为其只是将我们的数据按照这个RDD的模板复刻出一个相同外形的数据集而已,就好比我们伟大的活字印刷,如下图,一个RDD就是一个模板,很形象)。

  在我们的集群中,我们的数据分布在不同的机器上,所以可能每个集群节点都有这样的RDD,如果我们不指定,默认是任务的总核数就是每个RDD的分区数(partiotioner)。一组分区列表就指这些不同集群节点下的RDD分区组成的集合。再举个例子:
加入我们有一个数据集{1,2,3,4,5,6,7,8,9}均分布在各个节点,一个有三个executor-core,那么数据有可能会被分成{1,2,4},{3,5,6},{7,8,9}三个分区处理。但我如果我们指定fRDD分区数为2,那么就是{1,2,3,4,5},{1,2,3,4,5},那么这些数据处理分区集合就是一个分区列表。以上只是例子,实际分配按照实际数据分布和分区逻辑。

2.2、A list of partitions

  这个可能比较好理解,分区计算函数,类似与活字印刷的模板规则。就是数据处理的函数。比如rdd.map(_*10),
这里的 _*10,就是计算函数,将待处理的数据乘以2。也是我们主要要干的事。那这个特性就这样?不理解的自己去趟厕所哈。看图理解。


2.3、 A list of dependencies on other RDDs

  RDD之间的依赖关系,我们说Spark数据处理,就是RDD迫使数据之间的相互转换,当我们数据集经过一个RDD的计算后,会附带这个计算规则和数据来源,以下是全部的四种依赖关系,有了这些依赖关系,便可以形成任务的DAG(有向无环图),这个DAG会构造成很多个阶段,这些阶段叫做stage然后,多个stage组成一个任务,每一个行动算子就是一个任务。这个也很好理解。但是后面三种涉及到shuffle落盘 *** 作,我们后边再说,当有了这些个依赖关系,即使出现数据丢失,也可以通过依赖关系重新计算丢失的数据。

2.4、 a Partitioner for key-value RDDs
  RDD分区函数,前面说了一个RDD有很多分区,但是数据怎么决定被分配到哪个RDD分区接受改造了?这里就出现了RDD分区函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

2.4、 a list of preferred locations to compute each split on
  每一分片的优先计算位置,依靠移动数据不如移动计算的原则。解释一下,假如我们有三个集群节点A,B,C,每个节点有数据10G分别为:data1,data2,data3,一共30G的文件待处理。在这样情况下,最有的方案是节点A计算data1,节点B计算data2,节点C计算data3。这是我们希望的,如果节点A要计算data2,那么就需要将节点B下的data2通过网络传输发送到节点A,这个消耗可想而知。虽然是内网。相当于我们拷贝10G的数据到U盘在给其他存储器。想法是好的,怎么实现呢?我们提交程序后又主节点,也就是Driver发送监控任务,这个任务就是计算。那我们直接让driver把data1的计算方式发送给节点A让他去计算data1不就好了,以此类推,就实现了本节点计算本节点的数据,这是最优情况了,我们不能总活在想法里面啊,也有其他的情况下需要数据传输。但是Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置,也就是范围内实现最优解。


3、RDD的 *** 作

  既然说Spark数据处理就是RDD之间的转换,那总该说下RDD的 *** 作吧。这篇文章我们先仅说一下关于RDD的创建以及RDD的属性 *** 作。

3.1、创建RDD的两种方式

1、内存加载


makeRDD、parallelize
先说一下:makeRDD比较好记,且makeRDD的底层实现就是parallelize,所以我们就用makeRDD即可。如源码:

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

makeRDD可以传两个参数,第一个是数据 Seq[T],第二个就是分区个数。

    // TODO 创建执行环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    // 设置并行度
    conf.set("spark.default.parallelism", "5")
    val sc = new SparkContext(conf)
    // TODO 从内存里面创建RDD
    val seq = Seq[Int](1, 2, 3, 4, 5, 6)
    // parallelize并行度
    val rdd:RDD[Int] = sc.parallelize(seq)
    // makeRDD底层就是parallelize
    val value = sc.makeRDD(seq)
    rdd.foreach(x=>println(x))
    // TODO 关闭环境
    sc.stop()

在上面,我们可以看到有关于两个地方注释:设置任务并行度,就是字面意思,并发执行的个数。但是设置RDD的并行度优先级高于Conf里面的并行度。具体优先级: RDD分区数>代码中SparkConf的设置(最高的优先级) > spark-submit --选项 > spark-defaults.conf配置 > spark-env.sh配置 > 默认值

2、加载外部数据源

textFile、wholeTextFiles, binaryFiles等
这个方法基本都是有两个参数,第一个是文件路径 ,第二个是分区个数,缺省默认
`在这里插入代码片defaultMinPartitions: Int = math.min(defaultParallelism, 2)详细见源码。

    // TODO 创建执行环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create")
    val sc = new SparkContext(conf)

    
    val rdd:RDD[String] = sc.textFile("data/data.txt",3)


3.2、查看RDD的依赖关系

  
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RddDependenceDemo")
    val sc = new SparkContext(sparkConf)
    val Rdd: RDD[String] = sc.makeRDD(List("hello Spark", "hello Scala"))
    println(Rdd.toDebugString)
    println(Rdd.dependencies)
    val rdd: RDD[String] = Rdd.flatMap(line => {
      val strings: Array[String] = line.split(" ")
      strings
    })
    println(rdd.toDebugString)
    println(rdd.dependencies)
    val rdd2: RDD[(String, Int)] = rdd.map(data => (data, 1))
    println(rdd2.toDebugString)
    println(rdd2.dependencies)
        

好啦,暂时结束了,但是关于内容,我会始终纠正补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5634755.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存