Spark(七)——累加器和广播变量

Spark(七)——累加器和广播变量,第1张

Spark(七)——累加器和广播变量 5、累加器

通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。 Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 
 驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。 注意:工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 对于要在行动 *** 作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动 *** 作中。转化 *** 作中累加器可能会发生不止一次更新。

object SparkCoreDemo13_Accumulator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Demo13")
    val sc = new SparkContext(conf)
​
    var count = 0
    // 1. 创建累加器对象
    val acc = new LongAccumulator
    val myAcc = new MyAccumulator
    // 2. 使用sc注册累加器
    sc.register(acc)
    sc.register(myAcc)
​
    val rdd = sc.makeRDD(1 to 10)
​
    //    val sum = rdd.reduce(_ + _)
    //    val count1 = rdd.count()
    //    sum / count1.toDouble
​
    val rdd1 = rdd.map(x => {
      count += 1
      // 3. 使用累加器对象进行数据的添加
      acc.add(1)
      myAcc.add(x)
      println(s"acc value in map: ${acc.value} --" + Thread.currentThread().getId)
      println(s"count in map: $count---" + Thread.currentThread().getId)
      x + 1
    })
​
    println(rdd1.collect().toList)
​
    // 4. 使用value()获取累加器的值
    println(s"acc value in main: ${acc.value} --" + Thread.currentThread().getId)
    println(s"count in main: $count--" + Thread.currentThread().getId)
    // 上面的代码中
    // 如果想通过创建在Driver中的局部变量统计RDD 算子的执行次数
    // 最终无法获取到执行次数
    //   因为RDD的算子 *** 作是在Driver中进行编译
    //    并真正提交到执行器(Executor)中的任务线程(Task)中执行
    //     每个线程(Task)都会保有一份属于自己线程的局部变量
    //     最终Driver程序中的局部变量没有参与任何运算
​
    // Spark提供了Accumulator 累加器对象  用于方便的进行分布式聚合(计数)
​
    // AccumulatorV2
    //    add(对象)  将对象添加到累加器中
    //   对象 =  value()  获取累加器中的值
​
    println(myAcc.value)
​
  }
}

自定义累加器

//                               [IN,OUT] 累加器的输入对象
//                                        累加器的输出对象
class MyAccumulator extends AccumulatorV2[Int, Double] {
  // 创建成员属性用于记录当前累加器的值
  var count: Long = 0L
  var sum: Long = 0L
​
  
  override def isZero: Boolean = this.count == 0 && this.sum == 0
​
  
  override def copy(): AccumulatorV2[Int, Double] = {
    val accumulator = new MyAccumulator
    accumulator.count = this.count
    accumulator.sum = this.sum
    accumulator
  }
​
  
  override def reset(): Unit = {
    this.count = 0
    this.sum = 0
  }
​
  
  override def add(v: Int): Unit = {
    this.count += 1
    this.sum += v
  }
​
  
  override def merge(other: AccumulatorV2[Int, Double]): Unit = {
    val o = other.asInstanceOf[MyAccumulator]
    this.count += o.count
    this.sum += o.sum
  }
​
  
  override def value: Double = this.sum.toDouble / this.count
}
6、广播变量

使用广播变量的过程如下:

(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。

(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。

(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

package com.zch.spark.core
​
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
​
import scala.io.Source
​

object SparkCoreDemo14_BroadcastVariable {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Demo14")
    val sc = new SparkContext(conf)
​
​
    // 加载黑名单文件放入集合
    val source = Source.fromFile("C:\Users\Amos\Desktop\blackList.txt")
    //   文件大小1GB
    val blkList: List[String] = source.getLines().toList
    source.close()
​
    // 1. 创建广播对象
    val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)
​
    // 加载日志数据创建RDD
    val rdd = sc.textFile("C:\Users\Amos\Desktop\weblog\access.log-20211107")
    // 将日志数据通过处理得到  (ip,是否为黑名单用户)
    rdd
      .repartition(10)
      .map(line => {
        val ip = line.split(" ").head
        //2. 需要使用时  从公共缓存中读取对象
        val list = bc_blkList.value
        (ip, if (list.contains(ip)) 1 else 0)
      })
      .foreach(println)
​
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5677459.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存