目录
Cache缓存
CheckPoint检查点
累加器、广播变量
Cache缓存
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CacheDemo { def main(args: Array[String]): Unit = { val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo") val sc:SparkContext=SparkContext.getOrCreate(conf) val rdd1:RDD[String]=sc.textFile("in/users.csv").cache() var start:Long=System.currentTimeMillis() println(rdd1.count()) var end:Long=System.currentTimeMillis() println("count *** 作花费的时间:"+(end-start)+"毫秒") start=System.currentTimeMillis() println(rdd1.count()) end=System.currentTimeMillis() println("第二次count *** 作花费的时间:"+(end-start)+"毫秒") start=System.currentTimeMillis() println(rdd1.count()) end=System.currentTimeMillis() println("第三次count *** 作花费的时间:"+(end-start)+"毫秒") start=System.currentTimeMillis() rdd1.unpersist() println(rdd1.count()) end=System.currentTimeMillis() println("第四次count *** 作花费的时间:"+(end-start)+"毫秒") } }
CheckPoint检查点
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CheckPointDemo { def main(args: Array[String]): Unit = { val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo") val sc:SparkContext=SparkContext.getOrCreate(conf) sc.setCheckpointDir("file:///D://KB15checkpoint") // val rdd:RDD[(String,Int)]=sc.parallelize(Array(("a",1),("b",2),("c",3))) val rdd=sc.textFile("in/users.csv") rdd.checkpoint() println(rdd.count()) //行动算子 println("是否是checkpoint"+rdd.isCheckpointed) println("检查点文件"+rdd.getCheckpointFile) println("--------------------------------------------------") // val rdd2:RDD[String]=sc.textFile("file:/D:\KB15checkpoint67856c-a458-4e63-bde8-68e1dc10ea80\rdd-0") val rdd2:RDD[String]=sc.textFile(rdd.getCheckpointFile.get) println(rdd2.count()) rdd2.collect().foreach(println) } }
累加器、广播变量
import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{Accumulator, SparkConf, SparkContext} object BroadCastDemo { def main(args: Array[String]): Unit = { val conf:SparkConf=new SparkConf().setMaster("local[*]").setAppName("cacheDemo") val sc:SparkContext=SparkContext.getOrCreate(conf) // val myAccumul: Accumulator[Int] = sc.accumulator(0,"MyAccumul") var mycount=1 sc.parallelize(Array(1,2,3,4)).foreach( x=>{ println("x:" + x) println("mycount: " +mycount) mycount += 1 println("myAccumul: "+myAccumul) myAccumul += x } ) println(mycount,myAccumul) // val arr=Array("hello","hi","come on baby") // val broadCastVar:Broadcast[Array[String]]=sc.broadcast(arr) //不要广播太大的东西 提前准备好东西 // // val hi="how are you" // // val rdd=sc.parallelize(Array((1,"teacher"),(2,"worker"),(3,"teamleader"))) // // val rdd2: RDD[(Int, String)] = rdd.mapValues(x => { // println("value is :" + x) hi + ":" + x //自己携带资料去 // broadCastVar.value(2) +":" + x //工作资料已经提前准备好 // }) // // rdd2.collect().foreach(println) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)