spark中关于partition的简单理解

spark中关于partition的简单理解,第1张

看到一篇关于spark partition的文件,讲的简单易懂通俗,故转。

我们要想对spark中RDD的分区进行一个简单的了解的话,就不免要先了解一下hdfs的前世今生。

众所周知,hdfs是一个非常不错的分布式文件系统,这是这么多年来大家有目共睹的。

hdfs文件为分布式存储,每个文件都被切分为block(默认为128M)。为了达到容错的目的,他们还提供为每个block存放了N个副本(默认为3个)。当然,以上说的这些也可以根据实际的环境业务调整。

多副本除了可以达到容错的目的,也为计算时数据的本地性提供了便捷。当数据所在节点的计算资源不充足时,多副本机制可以不用迁移数据,直接在另一个副本所在节点计算即可。此时看到这里,肯定就有人会问了,那如果所有副本所在的节点计算资源都不充足那该怎么办

问的很好,一般会有一个配置来设置一个等待时长来等待的,假设等待时长为三秒,如果超过三秒,还没有空闲资源,就会分配给别的副本所在节点计算的,如果再别的副本所在节点也需等待且超过了三秒。则就会启动数据迁移了(诸多因素影响,代价就比较大了)。

接下来我们就介绍RDD,RDD是什么d性分布式数据集。

d性:并不是指他可以动态扩展,而是血统容错机制。

分布式:顾名思义,RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上。

再spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。至于后续遇到shuffle的 *** 作,RDD的partition可以根据Hash再次进行划分(一般pairRDD是使用key做Hash再取余来划分partition)。

再spark计算末尾,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。

鉴于上述partition大于128M的情况,在做sparkStreaming增量数据累加时一定要记得调整RDD的分区数。假设,第一次保存RDD时10个partition,每个partition有140M。那么该RDD保存在hdfs上就会有20个block,下一批次重新读取hdfs上的这些数据,RDD的partition个数就会变为20个。再后续有类似union的 *** 作,导致partition增加,但是程序有没有repartition或者进过shuffle的重新分区,这样就导致这部分数据的partition无限增加,这样一直下去肯定是会出问题的。所以,类似这样的情景,再程序开发结束一定要审查需不需要重新分区。

转自:>

RDD 是一个d性的分布式的数据集,是 Spark 中最基础的抽象。它表示了一个可以并行 *** 作的、不可变得、被分区了的元素集合。用户不需要关心底层复杂的抽象处理,直接使用方便的算子处理和计算就可以了。

RDD 示意图:

默认情况下,一个 HDFS 上的数据分片就是一个 partiton,RDD 分片数决定了并行计算的力度,可以在创建 RDD 时指定 RDD 分片个数,如果不指定分区数量,当 RDD 从集合创建时,则默认分区数量为该程序所分配到的资源的 CPU 核数 (每个 Core 可以承载 2~4 个 partition),如果是从 HDFS 文件创建,默认为文件的 Block 数。

有一点非常重要,就是由于 RDD 有前后依赖关系,遇到宽依赖关系,如 reduce By Key 等这些 *** 作时划分成 Stage, Stage 内部的 *** 作都是通过 Pipeline 进行的,在具体处理数据时它会通过 Blockmanager 来获取相关的数据,因为具体的 split 要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split 都会映射成 BlockManager 的 Block,而体的 splt 会被函数处理,函数处理的具体形式是以任务的形式进行的。

并行集合的方式:

引用外部存储系统的数据集 (text、HDFS、Hbase 等):

最简单的理解就是在 RDD 的算子中使用了外部 (Driver 端) 定义的变量。

RDD 每经过一次转换 *** 作都会生成一个新的 RDD,它们之间存在着依赖关系,这种依赖关系被划分成了两种,即 窄依赖 宽依赖

宽窄依赖示意图

当 RDD 触发了 action 算子之后,DAGScheduler 会开始分析最终 RDD 形成的依赖关系,逆向往前推导,前一个 RDD 被看做是父 RDD。每当遇到一个 宽依赖 的时候,便会以此为分界线,划分出一个 Stage。

stage 划分

当一个 RDD 需要被重复使用时,或者当任务失败重新计算的时候,这时如果将 RDD 缓存起来,就可以避免重新计算,保证程序运行的性能。

其实 cache 底层实际调用的就是 persist 方法,只是缓存的级别默认是 MEMORY_ONLY,而 persist 方法可以指定其他的缓存级别。

persist 也可以选择将数据缓存到磁盘当中,但是它交给 blockManager 管理的,一旦程序运行结束,blockManager 也会被停止,这时候缓存的数据就会被释放掉。而 checkPoint 持久化的数据并不会被释放,是一直存在的,可以被其它的程序所使用。

What is RDD

RDD is the spark's core abstraction which is resilient distributed dataset

It is the immutable distributed collection of objects

RDD Creation

RDD vs Dataframe vs Dataset

并行度的解释:

并行度:是 一个spark应用中,每个stage之中的task的数目。
什么是task:

一个spark应用任务执行的最基本单位。

还有语义比较相近的配置参数:
在提交spark 应用的时候,会进行配置的参数

--num-execuotrs 应用启动的executor的数量

--executor-cores 每个executor之中core的数量。

可以将 core 看成是一个空位,task 看成是一个人,

这里的设置的意思就是,有两个executor 每个executor上面有 2个core,

表示每次可以坐四个人,也就是可以跑4个task。
那么问题来了:

1:如何设置并行度?

2:并行度的设置 在多少较为合适?

3:如果不设置并行度,那么并行度由谁来决定?

问题1:
在sparkConf之中进行并行度的配置

问题2:

并行度的设置一般在 core executor (2或者3之间)。

(executor的数量 乘上 core的数量 再乘上 2到3)

并行度不是越大越好,并行度太大的话,可能会造成任务空跑,这样就白白浪费了时间和资源。

问题3:

rdd之中的分区的数目,就会称为应用的并行度。

一个RDD之中的一个partition 对应一个task任务。RDD之中由多少个分区,

那么就会相应地生成多少个task。
一个小注意点:

这里的并行度的设置 是spark core 这个模块的。

对于spark sql 可能不起作用。
然后 executor的数量和core的数量 的设置,就是需要看具体的机器配置,以及愿意给这个任务多少资源去跑。
所以,并行度决定了task的数量,但是值得注意的就是,如果task的数目太多,

可能某个task之中并没有数据,task只是空跑。

把每个partition中的分区号和对应的值拿出来

(1)参数说明:

(2)通过这两个参数,可以定义处理分区的函数。
Iterator[U] : *** 作完成后,返回的结果。

(3)示例:将每个分区中的元素和分区号打印出来。
(a)

(b)创建一个函数返回RDD中的每个分区号和元素:

(c)调用:

含义 :先对局部聚合,再对全局聚合

举例

(1)第一个例子:

(a)需求:将每个分区中的最大值求和,注意:初始值是0;

(b)需求:如果是求和,注意:初始值是0:

(2)第二个例子:一个字符串的例子:

运行结果:

(3)例三:更复杂一点的例子

程序执行分析:

结果可能是:”24”,也可能是:”42”

(4)例四:

程序执行分析:

结果是:”10”,也可能是”01”,

原因:注意有个初始值””,其长度0,然后0toString变成字符串

(5)例5:

结果是:”11”,原因同上

程序执行分析:

(1)准备数据:

(2)两个分区中的元素:

(3) 示例:

(a)将每个分区中的动物最多的个数求和

(b)将每种动物个数求和

(c)这个例子也可以使用:reduceByKey

与reduceByKey相比,aggregateByKey 效率更高

(1)都是将RDD中的分区进行重分区。

(2)区别是:coalesce默认不会进行shuffle(false);而repartition会进行shuffle(true),即:会将数据真正通过网络进行重分区。

(3)示例:

下面两句话是等价的:

参考: >

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/13379698.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-25
下一篇 2023-07-25

发表评论

登录后才能评论

评论列表(0条)

保存