Spark 累加器、广播变量

Spark 累加器、广播变量,第1张

Spark 累加器、广播变量 1. 累加器

Apache Spark 使用共享变量。当驱动程序向集群执行器发送任务时,集群的每个节点都会收到一份共享变量的副本。如果我们想实现向 MapReduce 上的计数器,显然是不可以的;如果我们想要更新这些副本的值,也无法影响驱动器的对中应变量。Apache Spark 支持两种基本类型的共享变量——累加器和广播。

当我们想要对数据进行关联 *** 作时,可以使用累加器。累加器通过关联和交互 *** 作,可实现计数、求和或求平均的功能。

累加器有两个实现类:

  • LongAccumulator :用于计算64位整数的总和、计数和平均值的累加器。
  • DoubleAccumulator :用于计算双精度浮点数的和、计数和平均数。
1.1 使用累加器

累加器实现步骤:

  1. 实例化累加器对象
  2. 使用 SparkContext 注册累加器对象
  3. 使用累加器进行数据的添加
  4. 使用 value() 获取累加器的数值
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(1 to 10)

    // 1. 实例化累加器对象
    var acc = new LongAccumulator()

    // 2. 使用 `SparkContext` 注册累加器对象
    sc.register(acc)

    // 3. 使用累加器进行数据的添加
    rdd.map(x => {
        acc.add(1)
    }).collect().toList

    // 4. 使用 `value()` 获取累加器的数值
    println(acc.value)

}
1.2 自定义累加器

自定义累加器的功能提供在在 1.x 版本之后,但是在 2.0 版本之后,累加器的易用性有了较大的改进,并提供了 AccumulatorV2 抽象类。所以自定义累加器,可继承该类,并实现其中的方法。

class My_Accumulator extends AccumulatorV2[Int, Long] {
    // 创建成员属性用于记录当前累加器的值
    var count: Long = 0L

    
    override def isZero: Boolean = this.count == 0

    
    override def copy(): AccumulatorV2[Int, Long] = {
        val acc = new My_Accumulator
        acc.count = this.count
        acc
    }

    
    override def reset(): Unit = this.count = 0

    
    override def add(v: Int): Unit = this.count += v

    
    override def merge(other: AccumulatorV2[Int, Long]): Unit = {
        val o = other.asInstanceOf[My_Accumulator]
        this.count += o.count
    }

    
    override def value: Long = this.count

}
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(1 to 10)

    // 1. 实例化自定义的累加器对象
    var acc = new My_Accumulator

    // 2. 使用 `SparkContext` 注册自定义累加器对象
    sc.register(acc)

    // 3. 使用累加器进行数据的添加
    rdd.map(x => {
        acc.add(1)
    }).collect().toList

    // 4. 使用 `value()` 获取累加器的数值
    println(acc.value)

}
2. 广播变量

广播变量(Broadcast)允许 Spark 的不同节点上保存一个安全的只读缓存变量,通过广播变量可高效分发较大的对象。

使用广播变量后,使得每个节点的 executor 中的 cache 才存储副本,就不同为每个 task 创建副本了。

2.1 使用广播变量

使用广播变量:

  • 对一个类型为 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象
  • 通过 value 属性访问该对象的值
  • 变量只会被发送到各个节点一次,为避免副本被更改,应当作为只读处理
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Demo14")
    val sc = new SparkContext(conf)


    // 加载黑名单文件放入集合
    val source = Source.fromFile("C:\Users\Amos\Desktop\blackList.txt")
    //   文件大小1GB
    val blkList: List[String] = source.getLines().toList
    source.close()

    // 1. 创建广播对象
    val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)

    // 加载日志数据创建RDD
    val rdd = sc.textFile("C:\Users\Amos\Desktop\weblog\access.log-20211107")
    // 将日志数据通过处理得到  (ip,是否为黑名单用户)
    rdd
    .repartition(10)
    .map(line => {
        val ip = line.split(" ").head
        //2. 需要使用时  从公共缓存中读取对象
        val list = bc_blkList.value
        (ip, if (list.contains(ip)) 1 else 0)
    })
    .foreach(println)

}

 


❤️ END ❤️

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5665106.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存