RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。
RDD通过 persist 方法或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看源码发现cache终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使 缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各 个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Demo示例:
容错机制
分别举例说明:
本地目录 注意:这种模式,需要将spark-shell运行在本地模式上
HDFS的目录 注意:这种模式,需要将spark-shell运行在集群模式上
源码中的一段话
Lineage(血统)
RDD的依赖关系
窄依赖指的是每一个父RDD的Partition多被子RDD的一个Partition使用
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
Spark任务中的Stage DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据 RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完 成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计 算, 因此宽依赖是划分Stage的依据 。
把每个partition中的分区号和对应的值拿出来
接收一个函数参数:
创建一个函数返回RDD中的每个分区号和元素:
调用:
先对局部聚合,再对全局聚合
查看每个分区中的元素:
将每个分区中的大值求和,注意:初始值是0;
如果初始值时候10,则结果为:30
如果是求和,注意:初始值是0:
如果初始值是10,则结果是:45
一个字符串的例子:
修改一下刚才的查看分区元素的函数
两个分区中的元素:
运行结果:
更复杂一点的例子
结果可能是:”24”,也可能是:”42”
结果是:”10”,也可能是”01”
原因:注意有个初始值””,其长度0,然后0.toString变成字符串
结果是:”11”,原因同上。
准备数据:
两个分区中的元素:
示例:将每个分区中的动物多的个数求和
将每种动物个数求和
这个例子也可以使用:reduceByKey
都是将RDD中的分区进行重分区。 区别是:coalesce默认不会进行shuffle(false);而repartition会进行shuffle(true),即:会将数 据真正通过网络进行重分区。
示例:
下面两句话是等价的:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)