Apache Spark 使用共享变量。当驱动程序向集群执行器发送任务时,集群的每个节点都会收到一份共享变量的副本。如果我们想实现向 MapReduce 上的计数器,显然是不可以的;如果我们想要更新这些副本的值,也无法影响驱动器的对中应变量。Apache Spark 支持两种基本类型的共享变量——累加器和广播。
当我们想要对数据进行关联 *** 作时,可以使用累加器。累加器通过关联和交互 *** 作,可实现计数、求和或求平均的功能。
累加器有两个实现类:
- LongAccumulator :用于计算64位整数的总和、计数和平均值的累加器。
- DoubleAccumulator :用于计算双精度浮点数的和、计数和平均数。
累加器实现步骤:
- 实例化累加器对象
- 使用 SparkContext 注册累加器对象
- 使用累加器进行数据的添加
- 使用 value() 获取累加器的数值
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator") val sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10) // 1. 实例化累加器对象 var acc = new LongAccumulator() // 2. 使用 `SparkContext` 注册累加器对象 sc.register(acc) // 3. 使用累加器进行数据的添加 rdd.map(x => { acc.add(1) }).collect().toList // 4. 使用 `value()` 获取累加器的数值 println(acc.value) }1.2 自定义累加器
自定义累加器的功能提供在在 1.x 版本之后,但是在 2.0 版本之后,累加器的易用性有了较大的改进,并提供了 AccumulatorV2 抽象类。所以自定义累加器,可继承该类,并实现其中的方法。
class My_Accumulator extends AccumulatorV2[Int, Long] { // 创建成员属性用于记录当前累加器的值 var count: Long = 0L override def isZero: Boolean = this.count == 0 override def copy(): AccumulatorV2[Int, Long] = { val acc = new My_Accumulator acc.count = this.count acc } override def reset(): Unit = this.count = 0 override def add(v: Int): Unit = this.count += v override def merge(other: AccumulatorV2[Int, Long]): Unit = { val o = other.asInstanceOf[My_Accumulator] this.count += o.count } override def value: Long = this.count }
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator") val sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10) // 1. 实例化自定义的累加器对象 var acc = new My_Accumulator // 2. 使用 `SparkContext` 注册自定义累加器对象 sc.register(acc) // 3. 使用累加器进行数据的添加 rdd.map(x => { acc.add(1) }).collect().toList // 4. 使用 `value()` 获取累加器的数值 println(acc.value) }2. 广播变量
广播变量(Broadcast)允许 Spark 的不同节点上保存一个安全的只读缓存变量,通过广播变量可高效分发较大的对象。
使用广播变量后,使得每个节点的 executor 中的 cache 才存储副本,就不同为每个 task 创建副本了。
2.1 使用广播变量使用广播变量:
- 对一个类型为 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象
- 通过 value 属性访问该对象的值
- 变量只会被发送到各个节点一次,为避免副本被更改,应当作为只读处理
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("Demo14") val sc = new SparkContext(conf) // 加载黑名单文件放入集合 val source = Source.fromFile("C:\Users\Amos\Desktop\blackList.txt") // 文件大小1GB val blkList: List[String] = source.getLines().toList source.close() // 1. 创建广播对象 val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList) // 加载日志数据创建RDD val rdd = sc.textFile("C:\Users\Amos\Desktop\weblog\access.log-20211107") // 将日志数据通过处理得到 (ip,是否为黑名单用户) rdd .repartition(10) .map(line => { val ip = line.split(" ").head //2. 需要使用时 从公共缓存中读取对象 val list = bc_blkList.value (ip, if (list.contains(ip)) 1 else 0) }) .foreach(println) }
❤️ END ❤️
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)