六十四、Spark-分别统计各个单词个数及特殊字符总个数

六十四、Spark-分别统计各个单词个数及特殊字符总个数,第1张

六十四、Spark-分别统计各个单词个数及特殊字符总个数 共享变量

广播变量(Broadcast Variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,简单理解:减少内存,减小计算压力;

累加器(Accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

需求说明:以词频统计WordCount程序为例,处理特殊数据,包括非单词符号,做WordCount的同时统计出特殊字符的数量

目录

共享变量

原数据展示

业务逻辑

完整代码

程序运行

项目总结


原数据展示

         注:原数据杂乱无章,与所需单词混淆,间隔且不等

业务逻辑

1、创建本地环境,并设置日志提示级别

val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")

2、创建累加器

val mycounter: LongAccumulator = sc.longAccumulator("mycounter")

3、定义特殊字符集合

val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")

4、将集合作为广播变量广播到各个节点

val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)

5、加载数据,创建RDD

val lines: RDD[String] = sc.textFile("data/input/words2.txt")

6、过滤筛选

val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split("\s+"))
      .filter(ch => {
        //获取广播数据
        val list: List[String] = broadcast.value
        if (list.contains(ch)) { //特殊字符
          mycounter.add(1)
          false
        } else { //单词
          true
        }
      }).map((_, 1))
      .reduceByKey(_ + _)

7、输出单词统计及特殊字符

wordcountResult.foreach(println)
val chResult: lang.Long = mycounter.value
println("特殊字符的数量:" + chResult)

完整代码
package org.example.spark

import java.lang
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}


object RDD_ShareVariable {
  def main(args: Array[String]): Unit = {
    //TODO 0.env/创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //创建计数器/累加器
    val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
    //定义一个特殊字符集合
    val ruleList: List[String] = List(",", ".", "!", "#", "$", "`", "~", "@", "?", "*", "^")
    //将集合作为广播变量广播到各个节点
    val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)

    //TODO 1.source/加载数据/创建RDD
    val lines: RDD[String] = sc.textFile("data/input/words2.txt")

    //TODO 2.transformation
    val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split("\s+"))
      .filter(ch => {
        //获取广播数据
        val list: List[String] = broadcast.value
        if (list.contains(ch)) { //特殊字符
          mycounter.add(1)
          false
        } else { //单词
          true
        }
      }).map((_, 1))
      .reduceByKey(_ + _)
    
    //TODO 3.sink/输出
    wordcountResult.foreach(println)
    val chResult: lang.Long = mycounter.value
    println("特殊字符的数量:" + chResult)
  }
}

程序运行

原数据:

控制台打印:

         注:通过对比,该程序实现了单词与特殊字符的分别统计

项目总结

使用广播变量的好处:

1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的后果。
2、使用广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少;

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717047.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存