SparkCore

SparkCore,第1张

一、RDD概述 1.RDD引入IO流

IO流的方式读取数据

1、字节流的方式读取数据(一次读取一个字节)

InputStream in = new FileInputStream("input")

2、字节缓冲流的方式读取数据(把数据缓存起来,批量读取)

InputStream in = new BufferedInputStream(FileInputStream("input"))

3、一次读取一行的缓存流的方式读取数据

BufferedReader reader = new BufferedReader(new  InputStreamReader( new FileInputStream("input"), "UTF-8") )

 流的读取,都是在调用read。称之为惰性加载。

IO流和RDD之间关系

//读取外部文件
val textRDD: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\")
//对读取到的内容进行切割并进行扁平化处理
val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))
//对数据集中的内容进行结构转换
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
//对相同的单词key的value进行聚合
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//将执行结果进行收集
val res: Array[(String, Int)] = reduceRDD.collect()
res.foreach(println)

1)产生一个lineRdd

2)flatMap包装了lineRdd,返回wordRdd

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}

3)map包装了wordRdd ,返回wordToOneRdd

def map[U: ClassTag](f: T => U): RDD[U] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

4)只有最后调用collect()才会执行

2.RDD

        RDD(Resilient Distributed Dataset)叫做d性分布式数据集,是Spark中最基本的数据抽象。 代码中是一个抽象类,它代表一个d性的、不可变、可分区、里面的元素可并行计算的集合。

1)d性
      存储的d性:内存与磁盘的自动切换;
      容错的d性:数据丢失可以自动恢复;
      计算的d性:计算出错重试机制;
      分片的d性:可根据需要重新分片。

2)分布式
       数据存储在大数据集群不同节点上

3)数据集
       RDD封装了计算逻辑,并不保存数据

4)数据抽象
       RDD是一个抽象类,需要子类具体实现

5)不可变
        RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑

6)可分区、并行计算

 3.RDD特性
 Internally, each RDD is characterized by five main properties:

  - A list of partitions
  - A function for computing each split
  - A list of dependencies on other RDDs
  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    an HDFS file)

(1)一组分区(Partition),即是数据集的基本组成单位;

protected def getPartitions:Array[Partition]

(2)一个计算每个分区的函数;

def compute(split: Partition, context: TaskContext): Interator[T]

(3)RDD之间的依赖关系;

protected def getDependencies: Seq[Dependency[_]] = deps

(4)一个Partitioner,即RDD的分片函数;控制分区的数据流向(键值对)

val partitioner : scala.Option[org.apache.spark.Partitioner]

(5)一个列表,存储存取每个Partition的优先位置(preferred location)。 移动数据不如移动计算,除非资源不够。

protected def getPreferredLocations(split : Partition) : scala.Seq[String]

A list of partitions

多个分区,分区可以看成是数据集的基本组成单位

对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。

用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的 CPU Core 的数目.

每个分配的存储是由BlockManager 实现的, 每个分区都会被逻辑映射成 BlockManager 的一个 Block,,而这个 Block 会被一个 Task 负责计算。

A function for computing each split

  • (分区)的函数.
  1. 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
  • A list of dependencies on other RDDs
  • RDD 之间的依赖关系
  1. 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • RDD,还有一个可选的分区器
  • key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

  • (preferred location)位置的列表
  • HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
二、RDD编程

RDD编程_asd623444055的博客-CSDN博客

三、数据读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件Json文件、Csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS以及数据库。

1.文件类数据读取与保存 1.Text文件

1)数据读取:textFile(String)

2)数据保存:saveAsTextFile(String)

3)代码实现

object Operate_Text {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 读取输入文件
        val inputRDD: RDD[String] = sc.textFile("input/1.txt")

        //3.2 保存数据
        inputRDD.saveAsTextFile("output")

        //4.关闭连接
        sc.stop()
    }
}

4)注意:如果是集群路径:hdfs://hadoop102:8020/input/1.txt

2.Json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

1)数据准备

input目录下创建1.txt文件,里面存储如下内容

  {"name": "zhangsan","age": 10}
  {"name": "lisi","age": 19}
  {"name": "wangwu","age": 25}

2)代码实现


/**
 * 读取Json
 */
object Spark03_readJson {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)


    val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\test.json")
    val resRDD: RDD[Option[Any]] = rdd.map(JSON.parseFull)
    resRDD.foreach(println)//  {"name": "zhangsan","age": 18}
                           //  {"name": "lisi","age": 19}


    //释放资源
    sc.stop()
  }
}

注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

​​​​​​​3.Sequence文件

        SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

1)代码实现

object Operate_Sequence {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))

        //3.2 保存数据为SequenceFile
        dataRDD.saveAsSequenceFile("output")

        //3.3 读取SequenceFile文件
        sc.sequenceFile[Int,Int]("output").collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

2注意:SequenceFile文件只针对PairRDD

4.Object对象文件​​​​​​​

        对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

1)代码实现

object Operate_Object {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))

        //3.2 保存数据
        dataRDD.saveAsObjectFile("output")

        //3.3 读取数据
        sc.objectFile[(Int)]("output").collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}
2.文件系统类数据读取与保存 1.HDFS

        Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建 *** 作接口。对于外部存储创建 *** 作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口。

2.MySQL

支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:

(1)添加依赖


    mysql
    mysql-connector-java
    5.1.27

(2)从Mysql读取数据

package com.spark.day06

import java.sql.DriverManager

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.parsing.json.JSON

/**
 * 读取MySQL数据
 *
 * sc: SparkContext,                      spark程序执行入口,上下文对象
 * getConnection: () => Connection,       获取数据库连接
 * sql: String,                           执行SQL语句
 * lowerBound: Long,                      查询的起始位置
 * upperBound: Long,                      查询的结束位置
 * numPartitions: Int,                    分区数
 * mapRow: (ResultSet) => T               对结果集的处理
 *
 *
 * 注册驱动
 * 获取连接
 * 创建数据库 *** 作对象PrepareStatement
 * 执行SQL
 * 处理结果集
 * 关闭连接
 */
object Spark04_MySQL_read {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    //数据库连接4要素
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://hadoop102:3306/test"
    var username = "root"
    var password = "000000"

    var sql:String = "select * from user where id >= ? and id <= ?"

    val resRDD = new JdbcRDD(
      sc,
      () => {
        //注册驱动
        Class.forName(driver)
        //获取连接
        DriverManager.getConnection(url, username, password)
      },
      sql, 3, 20,
      2,
      rs => (rs.getInt(1), rs.getString(2), rs.getInt(3))

    )

    resRDD.collect().foreach(println)
    //(3,wangwu,20)
    //(4,zhaoliu,21)

    //释放资源
    sc.stop()
  }
}

(3)往Mysql写入数据

package com.spark.day06

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 向MySQL写入数据
 *
 * sc: SparkContext,                      spark程序执行入口,上下文对象
 * getConnection: () => Connection,       获取数据库连接
 * sql: String,                           执行SQL语句
 * lowerBound: Long,                      查询的起始位置
 * upperBound: Long,                      查询的结束位置
 * numPartitions: Int,                    分区数
 * mapRow: (ResultSet) => T               对结果集的处理
 *
 *
 * 注册驱动
 * 获取连接
 * 创建数据库 *** 作对象PrepareStatement
 * 执行SQL
 * 处理结果集
 * 关闭连接
 */
object Spark05_MySQL_write {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    //数据库连接4要素
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://hadoop102:3306/test"
    var username = "root"
    var password = "000000"

    val rdd: RDD[(Int, String, Int)] = sc.makeRDD(List((9, "shiyi", 24), (10, "shier", 25)))

    //在循环体中创建连接对象,每次变量出RDD中的一个元素都要创建一个连接对象,效率较低,不推荐使用
    /*rdd.foreach {
      case (id, name, age) => {
        //注册驱动
        Class.forName(driver)
        //获取连接
        val conn: Connection = DriverManager.getConnection(url, username, password)
        //声明数据库 *** 作的SQL语句
        var sql: String = "insert into user(id,name,age)values(?,?,?)"
        //创建数据库 *** 作对象PrepareStatement
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //给参数赋值
        ps.setInt(1,id)
        ps.setString(2,name)
        ps.setInt(3,age)
        //执行SQL
        ps.executeUpdate()
        //关闭连接
        ps.close()
        conn.close()
      }
    }

     */
    //在循环体外创建连接对象,需要让ps实现序列化,但是Ps不是自定义的类型,没有办法实现
    //注册驱动
    /*Class.forName(driver)
    //获取连接
    val conn: Connection = DriverManager.getConnection(url, username, password)
    //声明数据库 *** 作的SQL语句
    var sql: String = "insert into user(id,name,age)values(?,?,?)"
    //创建数据库 *** 作对象PrepareStatement
    val ps: PreparedStatement = conn.prepareStatement(sql)
    //获取分区数据
    rdd.foreach {
      case (id, name, age) => {
        //给参数赋值
        ps.setInt(1,id)
        ps.setString(2,name)
        ps.setInt(3,age)
        //执行SQL
        ps.executeUpdate()
      }
    }
    //关闭连接
    ps.close()
    conn.close()

     */

    rdd.foreachPartition {
      //获取RDD一个分区数据
      datas => {
        //注册驱动
        Class.forName(driver)
        //获取连接
        val conn: Connection = DriverManager.getConnection(url, username, password)
        //声明数据库 *** 作的SQL语句
        var sql: String = "insert into user(id,name,age)values(?,?,?)"
        //创建数据库 *** 作对象PrepareStatement
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //对当前分区内的数据进行遍历
        //注意:这个foreach不是算子,是集合的方法
        datas.foreach {
          case (id, name, age) => {
            ps.setInt(1, id)
            ps.setString(2, name)
            ps.setInt(3, age)
            //执行SQL
            ps.executeUpdate()
          }
        }


        //关闭连接
        ps.close()
        conn.close()
      }
    }

    //释放资源
    sc.stop()
  }
}

四、累加器

累加器:分布式共享只写变量。(Task和Task之间不能读数据)

        累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

1.系统累加器

1)代码实现

package com.spark.day06

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

/**
 * Accumulator累加器
 *  分布式共享只写变量--------task之间不能读取数据
 */
object Spark06_Accumulator {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    //创建一个RDD,对其进行求和 ,如果是单值RDD,直接对数据进行求和,内存空间占用小,不存在Shuffle的过程
    /*val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    println(rdd.sum())
    println(rdd.reduce(_ + _))

     */
    //val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 4), ("a", 3)))
    /*(k,v)RDD计算求和存在Shuffle 效率较低
    val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
    resRDD.map(a=>a._2).collect().foreach(println)

     */
    //如果定义一个普通的遍历,那么在Driver定义,Executor会创建变量的副本,算子都是对副本进行 *** 作,Driver端的变量不会更新
    /*var sum:Int = 0
    rdd.foreach{
      case (word,count)=>{
        sum += count
      }
    }
    println(sum)//0

     */

    //如果需要通过Executor对Driver端定义的变量进行更新,需要定义累加器
    //累加器和普通的变量相比,会把Executor端的结果,收集到Driver端继续汇总

    //创建累加器
    //val sum: Accumulator[Int] = sc.accumulator(10)过期了
    val sum: LongAccumulator = sc.longAccumulator("MyAcc")
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 4), ("a", 3)),2)
    rdd.foreach{
      case (word,count)=>{
        sum.add(count)
        println(" .....  "+sum.value)
      }
    }
    println(sum.value)
    //释放资源
    sc.stop()
  }
}

        通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是 add)增加累加器的值。驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。

注意:

(1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。

(2)对于要在行动 *** 作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动 *** 作中。转化 *** 作中累加器可能会发生不止一次更新。

2.自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。

1)自定义累加器步骤

(1)继承AccumulatorV2,设定输入、输出泛型

(2)重写方法

2)需求:自定义累加器,统计集合中首字母为“H”单词出现的次数。

List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark", "Hadoop", "High")

3)代码实现

package com.spark.day06

import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * Accumulator自定义累加器
 * 统计出RDD中,以"H"起始的单词
 */
object Spark07_MyAccumulator {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark", "Hadoop", "High"))

    //创建累加器对象
    val myAcc = new MyAccumulator

    //注册累加器
    sc.register(myAcc)

    //使累加器
    rdd.foreach {
      word => {
        myAcc.add(word)
      }
    }

    //输出累加器结果
    println(myAcc.value)
    //释放资源
    sc.stop()
  }
}

//定义一个类,继承AccumulatorV2
//泛型累加器输入和输出的类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
  //定义一个集合,记录单词及出现次数
  var map = mutable.Map[String, Int]()

  //是否为初始状态 判断集合是否为空
  override def isZero: Boolean = map.isEmpty

  //拷贝
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
    val newAcc = new MyAccumulator
    newAcc.map = this.map
    newAcc
  }

  //重置    清空集合
  override def reset(): Unit = map.clear()

  //向累加器中添加元素
  override def add(v: String): Unit = {
    if (v.startsWith("H")) {
      map(v) = map.getOrElse(v, 0) + 1
    }

  }

  //合并
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    //当前Executor的map
    var map1 = map
    //另一个Executor的map
    var map2 = other.value

    map = map1.foldLeft(map2) {
      //mm表示map2,kv表示map1中的每一个元素
      (mm, kv) => {
        //指定合并规则
        val key: String = kv._1
        val value: Int = kv._2
        //根据map1中的key,到map2中去找value
        mm(key) = mm.getOrElse(key, 0) + value
        mm
      }
    }

  }

  //获取累加器中的值
  override def value: mutable.Map[String, Int] = map
}
五、广播变量

广播变量:分布式共享只读变量。

        在多个并行 *** 作中(Executor)使用同一个变量,Spark默认会为每个任务(Task)分别发送,这样如果共享比较大的对象,会占用很大工作节点的内存。

        广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark *** 作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。

1)使用广播变量步骤:

(1)通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象,任何可序列化的类型都可以这么实现。

(2)通过value属性访问该对象的值(在Java中为value()方法)。

(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

2)代码实现

package com.spark.day06

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 广播变量
 *  分布式共享只读变量
 */
object Spark08_Broadcast {
  def main(args: Array[String]): Unit = {
    //创建Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象,该对象时提交Spark App的入口
    val sc = new SparkContext(conf)

    //实现类似join效果 (a,(1,4),(b,(2,5)),(c,(3,6)))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
    val list = List(("a", 4), ("b", 5), ("c", 6))

    //list需要发往每一个task 占用内存,不建议使用
    /*val resRDD: RDD[(String, (Int, Int))] = rdd.map {
      case (k1, v1) => {
        var v3 = 0
        for ((k2, v2) <- list) {
          if (k1 == k2) {
            v3 = v2
          }
        }
        (k1, (v1, v3))
      }
    }
    resRDD.collect().foreach(println)
     */
    //创建广播变量 broadcastList会发往每个executor task之间从executor读取变量
    val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list)
    val resRDD: RDD[(String, (Int, Int))] = rdd.map {
      case (k1, v1) => {
        var v3 = 0
        for ((k2, v2) <- broadcastList.value) {
          if (k1 == k2) {
            v3 = v2
          }
        }
        (k1, (v1, v3))
      }
    }

    resRDD.collect().foreach(println)

    //释放资源
    sc.stop()
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/870128.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存