【回顾】Spark核心编程 --- 累加器 Accumulator

【回顾】Spark核心编程 --- 累加器 Accumulator,第1张

【回顾】Spark核心编程 --- 累加器 Accumulator

文章目录
  • 1、实现原理
  • 2、案例检测
  • 3、自定义累加器 --- wordCount


1、实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。


2、案例检测
package test03_rdd.accumulator

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Accumulator {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 并设置 App 名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
    // 创建 SparkContext,该对象是提交 Spark App 的入口
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 方式一
    val count1 = rdd.reduce(_+_)
    println("count1_reduce",count1)

    // 方式二
    var sum = 0
    val count2 = rdd.foreach(
      num => {
        sum += num
      }
    )
    println("count2_foreach",count2)

    sc.stop()
  }
}

(count1_reduce,10)
(count2_foreach,())

通过上面的小案例我们设法实现累加的功能,reduce之前讲到过,分别进行分区内和分区间的计算,最终生成结果。同样的,按照分析,我们通过遍历数据得到每一个数据,然后通过sum进行累加求和,正常的应该是没有问题的,但是运行的结果却差强人意。

其实还是内部的原因,foreach遍历累加在Executor中进行,没有问题,但是计算好了之后并没有将计算的结果返回给Driver端,而Driver端的sum还是初始值0,println也在Driver端,所以此时输出会有问题。

而累加器的出现,就解决了节点计算,数据回送的问题。在初始的时候创建累加器,并将其发送到Executor端,Executor端计算完成后将累加器返回到Driver端,Driver端再将累加器中的数据进行合并,最终输出。

累加器是一个分布式共享只写变量:分布式在于它可以进行分发计算,共享在于Driver的累加器被多个Executor共享,只写变量在于累加器之间是互相读取不到的,不同的Executor之间累加器是独立的,只有Driver可以访问

使用Spark自带的累加器:

val rdd = sc.makeRDD(List(1, 2, 3, 4))
// 获取系统的累加器
val counter = sc.longAccumulator("sum")
rdd.foreach(
  num => {
    // 使用累加器
    counter.add(num)
  }
)
// 获取累加器的值
println(counter.value)

10

当我们把foreach算子转换为map算子的时候,运行结果又变成了0,这是因为map算子仅是转换算子,在没有触发job作业之前是不会执行的

rdd.map(
  num => {
    // 使用累加器
    counter.add(num)
  }
)

0

当我们将map *** 作转为RDD后,调用两次collect算子,会出现多加的情况。因为,当前累加器是全局的,每调用一次collect算子就会计算一次,两次collect也就会加两次。

val mapRDD = rdd.map(
  num => {
    // 使用累加器
    counter.add(num)
  }
)
mapRDD.collect()
mapRDD.collect()

20

注意:一般情况下,我们在行动算子中使用累加器!

返回顶部


3、自定义累加器 — wordCount

主要逻辑部分:

  • 我们希望通过自定义累加器实现单词的统计,自定义累加器的使用需要向sc进行注册,注册完成后即可使用。
def main(args: Array[String]): Unit = {
  // 创建 SparkConf 并设置 App 名称
  val conf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
  // 创建 SparkContext,该对象是提交 Spark App 的入口
  val sc = new SparkContext(conf)

  // 注册自定义的累加器
  val myAccumulator = new MyAccumulator
  // 向sc注册
  sc.register(myAccumulator, "wc")
  val rdd = sc.makeRDD(List("scala", "spark", "hadoop"))
  rdd.foreach(
    words => {
      myAccumulator.add(words)
    }
  )
  // 输出
  println(myAccumulator.value)
  sc.stop()
}

自定义累加器部分:

  • 在定义累加器的时候我们需要使其继承AccoumulatorV2 ,并同时定义泛型
    • IN:累加器输入的数据类型
    • OUT:累加器返回的数据类型 mutable.Map[String,Long]

我们需要通过传入单词来进行统计,所以输入类型为String,在计算的时候通过单词进行统计,使用Map集合的形式,getOrElse()主要就是防范措施,如果有值,那就可以得到这个值,如果没有就会得到一个默认值;从API中可以看出,传入的参数是(key,default)这种形式,返回值是:如果有key那就get(key),如果没有,就返回default。这样也便于内部的计算。

class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
  // 创建接收器
  private var wcMap = mutable.Map[String, Long]()

  // 判断是否为初始状态
  override def isZero: Boolean = {
    wcMap.isEmpty
  }

  // 赋值一份
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new MyAccumulator()
  }

  // 重置(清空)
  override def reset(): Unit = {
    wcMap.clear()
  }

  // 获取累加器需要计算的值
  override def add(word: String): Unit = {
    val newCnt = wcMap.getOrElse(word, 0L) + 1
    wcMap.update(word, newCnt)
  }

  // Driver合并多个累加器的结果  
  // 实际上就是两个mutable.Map的相加计算
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map1 = this.wcMap
    val map2 = other.value
    map2.foreach {
      case (word, count) => {
        val newCount = map1.getOrElse(word, 0L) + count
        map1.update(word, newCount)
      }
    }
  }
  // 累加器的结果
  override def value: mutable.Map[String, Long] = wcMap
}

返回顶部


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5696731.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存