2021.12.8 RDD持久化:Spark Cache缓存、CheckPoint检查点,RDD共享变量:累加器、广播变量

2021.12.8 RDD持久化:Spark Cache缓存、CheckPoint检查点,RDD共享变量:累加器、广播变量,第1张

2021.12.8 RDD持久化:Spark Cache缓存、CheckPoint检查点,RDD共享变量累加器、广播变量

目录

 Cache缓存

 CheckPoint检查点

 累加器、广播变量


 

 Cache缓存
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)


    val rdd1:RDD[String]=sc.textFile("in/users.csv").cache()
    var start:Long=System.currentTimeMillis()
    println(rdd1.count())
    var end:Long=System.currentTimeMillis()
    println("count *** 作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第二次count *** 作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第三次count *** 作花费的时间:"+(end-start)+"毫秒")

    start=System.currentTimeMillis()
    rdd1.unpersist()
    println(rdd1.count())
    end=System.currentTimeMillis()
    println("第四次count *** 作花费的时间:"+(end-start)+"毫秒")





  }

}
 CheckPoint检查点
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckPointDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)


    sc.setCheckpointDir("file:///D://KB15checkpoint")
//    val rdd:RDD[(String,Int)]=sc.parallelize(Array(("a",1),("b",2),("c",3)))
    val rdd=sc.textFile("in/users.csv")
    rdd.checkpoint()
    println(rdd.count()) //行动算子
    println("是否是checkpoint"+rdd.isCheckpointed)
    println("检查点文件"+rdd.getCheckpointFile)

    println("--------------------------------------------------")

//    val rdd2:RDD[String]=sc.textFile("file:/D:\KB15checkpoint67856c-a458-4e63-bde8-68e1dc10ea80\rdd-0")
    val rdd2:RDD[String]=sc.textFile(rdd.getCheckpointFile.get)
    println(rdd2.count())
    rdd2.collect().foreach(println)




  }

}
 累加器、广播变量
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object BroadCastDemo {
  def main(args: Array[String]): Unit = {

    val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo")
    val sc:SparkContext=SparkContext.getOrCreate(conf)  //

    val myAccumul: Accumulator[Int] = sc.accumulator(0,"MyAccumul")

    var mycount=1

    sc.parallelize(Array(1,2,3,4)).foreach(
      x=>{
        println("x:" + x)
        println("mycount: " +mycount)
        mycount += 1
        println("myAccumul: "+myAccumul)
        myAccumul += x
      }
    )

    println(mycount,myAccumul)


    
    //    val arr=Array("hello","hi","come on baby")
//    val broadCastVar:Broadcast[Array[String]]=sc.broadcast(arr)  //不要广播太大的东西  提前准备好东西
//
//    val hi="how are you"
//
//    val rdd=sc.parallelize(Array((1,"teacher"),(2,"worker"),(3,"teamleader")))
//
//    val rdd2: RDD[(Int, String)] = rdd.mapValues(x => {
//      println("value is :" + x)
      hi + ":" + x  //自己携带资料去
//      broadCastVar.value(2) +":" + x   //工作资料已经提前准备好
//    })
//
//    rdd2.collect().foreach(println)

  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653066.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存