文章目录
- 1、实现原理
- 2、案例检测
- 3、自定义累加器 --- wordCount
1、实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
2、案例检测
package test03_rdd.accumulator import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Accumulator { def main(args: Array[String]): Unit = { // 创建 SparkConf 并设置 App 名称 val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator") // 创建 SparkContext,该对象是提交 Spark App 的入口 val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 方式一 val count1 = rdd.reduce(_+_) println("count1_reduce",count1) // 方式二 var sum = 0 val count2 = rdd.foreach( num => { sum += num } ) println("count2_foreach",count2) sc.stop() } } (count1_reduce,10) (count2_foreach,())
通过上面的小案例我们设法实现累加的功能,reduce之前讲到过,分别进行分区内和分区间的计算,最终生成结果。同样的,按照分析,我们通过遍历数据得到每一个数据,然后通过sum进行累加求和,正常的应该是没有问题的,但是运行的结果却差强人意。
其实还是内部的原因,foreach遍历累加在Executor中进行,没有问题,但是计算好了之后并没有将计算的结果返回给Driver端,而Driver端的sum还是初始值0,println也在Driver端,所以此时输出会有问题。
而累加器的出现,就解决了节点计算,数据回送的问题。在初始的时候创建累加器,并将其发送到Executor端,Executor端计算完成后将累加器返回到Driver端,Driver端再将累加器中的数据进行合并,最终输出。
累加器是一个分布式共享只写变量:分布式在于它可以进行分发计算,共享在于Driver的累加器被多个Executor共享,只写变量在于累加器之间是互相读取不到的,不同的Executor之间累加器是独立的,只有Driver可以访问
使用Spark自带的累加器:
val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 获取系统的累加器 val counter = sc.longAccumulator("sum") rdd.foreach( num => { // 使用累加器 counter.add(num) } ) // 获取累加器的值 println(counter.value) 10
当我们把foreach算子转换为map算子的时候,运行结果又变成了0,这是因为map算子仅是转换算子,在没有触发job作业之前是不会执行的
rdd.map( num => { // 使用累加器 counter.add(num) } ) 0
当我们将map *** 作转为RDD后,调用两次collect算子,会出现多加的情况。因为,当前累加器是全局的,每调用一次collect算子就会计算一次,两次collect也就会加两次。
val mapRDD = rdd.map( num => { // 使用累加器 counter.add(num) } ) mapRDD.collect() mapRDD.collect() 20
注意:一般情况下,我们在行动算子中使用累加器!
返回顶部
3、自定义累加器 — wordCount
主要逻辑部分:
- 我们希望通过自定义累加器实现单词的统计,自定义累加器的使用需要向sc进行注册,注册完成后即可使用。
def main(args: Array[String]): Unit = { // 创建 SparkConf 并设置 App 名称 val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator") // 创建 SparkContext,该对象是提交 Spark App 的入口 val sc = new SparkContext(conf) // 注册自定义的累加器 val myAccumulator = new MyAccumulator // 向sc注册 sc.register(myAccumulator, "wc") val rdd = sc.makeRDD(List("scala", "spark", "hadoop")) rdd.foreach( words => { myAccumulator.add(words) } ) // 输出 println(myAccumulator.value) sc.stop() }
自定义累加器部分:
- 在定义累加器的时候我们需要使其继承AccoumulatorV2 ,并同时定义泛型
- IN:累加器输入的数据类型
- OUT:累加器返回的数据类型 mutable.Map[String,Long]
我们需要通过传入单词来进行统计,所以输入类型为String,在计算的时候通过单词进行统计,使用Map集合的形式,getOrElse()主要就是防范措施,如果有值,那就可以得到这个值,如果没有就会得到一个默认值;从API中可以看出,传入的参数是(key,default)这种形式,返回值是:如果有key那就get(key),如果没有,就返回default。这样也便于内部的计算。
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { // 创建接收器 private var wcMap = mutable.Map[String, Long]() // 判断是否为初始状态 override def isZero: Boolean = { wcMap.isEmpty } // 赋值一份 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() } // 重置(清空) override def reset(): Unit = { wcMap.clear() } // 获取累加器需要计算的值 override def add(word: String): Unit = { val newCnt = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCnt) } // Driver合并多个累加器的结果 // 实际上就是两个mutable.Map的相加计算 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = this.wcMap val map2 = other.value map2.foreach { case (word, count) => { val newCount = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } } } // 累加器的结果 override def value: mutable.Map[String, Long] = wcMap }
返回顶部
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)