【回顾】RDD的持久化

【回顾】RDD的持久化,第1张

【回顾】RDD的持久化

文章目录
  • 1、RDD Cache 缓存
  • 2、RDD CheckPoint 检查点
  • 3、缓存和检查点区别


1、RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

// cache  *** 作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
存储级别
object StorageLevel {
	 val NONE = new StorageLevel(false, false, false, false)
	 val DISK_onLY = new StorageLevel(true, false, false, false)
	 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
	 val MEMORY_onLY = new StorageLevel(false, true, false, true)
	 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
	 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
	 val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
	 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
	 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
	 val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
	 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
	 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。

Spark 会自动对一些 Shuffle *** 作的中间数据做持久化 *** 作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。

返回顶部


2、RDD CheckPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对 RDD 进行 checkpoint *** 作并不会马上被执行,必须执行 Action *** 作才能触发,并且由检查点生成的缓存文件job执行完成后不会被删除。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("data/t1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
 word => {
 (word, System.currentTimeMillis())
 }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordTooneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

返回顶部


3、缓存和检查点区别

1)Cache 缓存只是将数据保存起来,添加新的依赖,不切断血缘依赖。Checkpoint 检查点切断血缘依赖,执行过程中,会切断血缘关系,建立新的血缘关系。checkPoint等同于改变我们的数据源。

2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。

3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

  • // 会再次执行一个job
    rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

.......

private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {

  
  protected override def doCheckpoint(): CheckpointRDD[T] = {
    val level = rdd.getStorageLevel

    // Assume storage level uses disk; otherwise memory eviction may cause data loss
    assume(level.useDisk, s"Storage level $level is not appropriate for local checkpointing")

    // Not all actions compute all partitions of the RDD (e.g. take). For correctness, we
    // must cache any missing partitions. TODO: avoid running another job here (SPARK-8582).
    val action = (tc: TaskContext, iterator: Iterator[T]) => Utils.getIteratorSize(iterator)
    val missingPartitionIndices = rdd.partitions.map(_.index).filter { i =>
      !SparkEnv.get.blockManager.master.contains(RDDBlockId(rdd.id, i))
    }
    if (missingPartitionIndices.nonEmpty) {
      // 会再次执行一个job
      rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
    }

    new LocalCheckpointRDD[T](rdd)
  }

}

返回顶部


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695137.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存