所谓的检查点其实就是通过将 RDD 中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint *** 作并不会马上被执行,必须执行 Action *** 作才能触发。
sc.setCheckpointDir("/checkpoint1") val lineRdd: RDD[String] = sc.textFile("input/1.txt") val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" ")) val wordToOneRdd: RDD[(String, Long)] = wordRdd.map { word => { (word, System.currentTimeMillis()) } } wordToOneRdd.cache() wordToOneRdd.checkpoint() wordToOneRdd.collect().foreach(println)
缓存和检查点区别
Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。
将数据长久地保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全。为了保证数据安全,所以一般情况下,会独立执行作业。为了能够提高效率,一般情况下,是需要和cache联合使用。执行过程中,会切断血缘关系。重新建立新的血缘关checkpoint等同于改变数据源。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)