spark算子

spark算子,第1张

spark算子

目录

常见转换算子

map

flatmap

mapParttions

fliter

groupByKey

reudceByKey

join

Union

sortBy

常见行为算子

collect

count

reduce

take

fliter

foreachPartition


转换算子:

RDD的转换 *** 作是:一个RDD的经过转换 *** 作后,返回一个新的RDD

转换算子都是懒执行的,你在里面写好了逻辑,单独一个这样是不会运行的,需要 *** 作算子使用了这些RDD才会执行里面的逻辑

行为算子:

行为算子用于执行计算并按指定的方式输出结果。行为算子接受 RDD,但是返回非 RDD,即输出一个值或者结果。在 RDD 执行过程中,真正的计算发生在行为算子 *** 作之前。

spark程序中有一个 *** 作算子,就会生成相同数量的job
 

懒执行的证明:

写个local模式的spark程序,通过集合创建一个rdd,用转换算子map实现:打印rdd里面的元素,扩大20倍,为了直观感受,写了一个map之前和map之后的标签

import org.apache.spark.{SparkConf, SparkContext}

object Demo2RDD {
  def main(args: Array[String]): Unit = {

    //本地测试用的连接
    val conf = new SparkConf()
    conf.setAppName("Demo2RDD")
    conf.setMaster("local")
    val sc = new SparkContext(conf)

    //通过集合创建RDD
    val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6))

    println("map之前")

    listRDD.map(i=>{
      println(i)
      i*20
    })
    println("map之后")

        
    //这里加个死循环可以让我们去spark的页面查看运行状况
    while (true){
    }
  }
}

输出结果:只有我们打印的2句话,这就是因为转换算子的懒执行,我们将计算逻辑写进了转换算子中,但是他不会执行,只有在行为算子中调用了这个rdd,才会触发转换算子的执行 

 在上面的代码中加入一个foreach的行为算子,这样就可以看到map执行了我们写进去的计算逻辑

listRDD.map(i=>{
      println(i)
      i*20
    }).foreach(println)

最终打印了我们想要的结果 

常见转换算子 map

与scala中的map方法一样接受一个函数f,返回值类型自定义

val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6))

listRDD.map(_*20).foreach(println)

//输出结果:20 40 60 80 100 120

flatmap

与scala中的flatMap方法一样,接受一个函数,要求返回值类型是集合类型,会自动会返回的集合进行展开 *** 作

我这里读取了文本数据,返回值是将每行数据通过split方法切分逗号得到的一个数组,符合要求

val linesRdd: RDD[String] = sc.textFile("spark/data/words.txt")

linesRdd.flatMap(lines=>lines.split(",")).foreach(println)

//输出结果:hello java hello world spark scala

mapParttions

这个算子的作用是向里面写入一个函数,参数是迭代器,返回值自定义

object mapPartitionDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("mapPartitionDemo")
    val sc = new SparkContext(conf)

    val linesRDD: RDD[String] = sc.textFile("spark/data/")

    linesRDD.mapPartitions((iter:Iterator[String])=>{
      println("**mapPartition**")
      iter.flatMap(line=>line.split(","))
    }).foreach(println)

    linesRDD.map(lines=>{
      println("**map**")
      lines.split(",")
    }).foreach(println)

    while(true){
    }
  }
}

 打印的数据没什么好说的,结果看起来都一样是把目标文件夹下面的单词都打印出来了

但是**mapPartition**打印了2次,**map**打印了10次

我的data文件夹下是2个文件,都是几十k的小文件,我们读取数据就会产生2个分区

mapPartition算子做了什么事呢?

他是根据分区读取数据,传入一个分区(迭代器)后,再对这个分区的数据进行处理,我用flatmap方法处理这个迭代器

map算子做了什么呢?

他是将每一行的数据读入,然后切分

在将这个打印的标签换成数据库连接的时候就可以感受到2者的差距在哪里

mapPartitionsWithIndex,和上面的方法没啥区别,就是我们可以多查看一个分区号,从0开始计数

linesRDD.mapPartitionsWithIndex((index,iter)=>{
      println(index)
      iter.flatMap(line=>line.split(","))
    }).foreach(println)
fliter

filte需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是Boolean类型

会根据函数f的返回值对数据进行过滤,如果返回true则保留数据,返回false则将数据过滤

val filterRDD: RDD[String] = linesRDD
      .filter(line => line.split(",")(4).startsWith("文科"))
filterRDD.foreach(println)

过滤出了文科班的学生 

groupByKey

和groupByKey相似的一个是groupby算子

groupby和scala中的一样,传入一个自定义函数,参数指定分组的内容,返回k-v格式的数据

val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark"))

val wordRDD = arrRDD.map(word => (word, 1))

val groupRDD = wordRDD.groupBy(kv => kv._1)

groupRDD.foreach(println)

 groupByKey:要求传入的数据是k-v格式,根据key直接为我们分好组

val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark"))

val wordRDD = arrRDD.map(word => (word, 1))

val groupRDD: RDD[(String, Iterable[Int])] = wordRDD.groupByKey()

groupRDD.foreach(println)

可以看到相比上面的groupby方法,groupByKey方法只是返回了value

reudceByKey

 reduceByKey只能作用在kv格式的数据上

相比较于groupByKey:性能更高但功能较弱

需要接收一个 聚合函数f,一般是 加和、最大值、最小值

 相当于MR中的combiner,只能使用等幂 *** 作

val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark"))

val wordRDD = arrRDD.map(word => (word, 1))

val rb_RDD = wordRDD.reduceByKey((i, j) => {
   i + j
})

rb_RDD.foreach(println)

join

join *** 作,我们首先需要准备两个rdd,这2个rdd必须是kv格式,而且key需要相同,key相同的,会把他们的value放入一个元组中作为新的value,key就是连接的key

val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))

val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))

rdd.join(rdd1).foreach(println)//输出结果(1,(dd,4)) (2,(bb,5)) (3,(aa,6))

left join/right join

join默认是inner join,有时候可能需要用到left join/right join这种 *** 作

在maysql中,如果id关联上,但是被关联一方的数据为空,是用null填充;在spark中很显然没有这种 *** 作,使用left/right join,被关联方返回的是一个Option类型的数据,如果有数据就是Some类型,没有数据就是None类型

比如我这里写的rdd1里面就没有key是6的,可以看看结果应该是 6,(cc,None)

val rdd: RDD[(Int, String)] = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
val rdd1: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd.leftOuterJoin(rdd1).foreach(println)

如果要取出Some里面的数据,用case模式匹配很好做

Union

连接相同格式的RDD

val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10))
val union_RDD: RDD[Int] = RDD1.union(RDD2)
union_RDD.foreach(println)
//输出结果:1 2 3 4 5 6 7 8 9 10

不同格式的,你在写的时候就直接报错了

sortBy

与scala中的使用相同,指什么进行排序

val RDD3: RDD[(String,Int)] = sc.parallelize(List(("one",3), ("two",2), ("three",1)))
RDD3.sortBy(kv=>kv._2).foreach(println)

 可以在sortBy里面加参数,ascending = false这样就是降序排序

常见行为算子 collect

将RDD里面的元素取出放在一个数组中

val RDD4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
val arr1: Array[Int] = RDD4.collect()
arr1.foreach(println)
//输出:1,2,3,4,5

说到collect,提一个问题就是:一个RDD中不能直接使用另一个RDD

比如我这里想要把RDD1中包含RDD2的元素挑出来,结果直接报错了

val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10))

val fil_RDD: RDD[Int] = RDD1.filter(i => {
    RDD2.collect().contains(i)
})

fil_RDD.foreach(println)


RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
 

这说的是RDD的转换以及action没有被driver调度,为什么是这样,需要了解我们写的spark程序,

我这里写的2个RDD在spark中是位于不同分区的那么就是2个task,我在这个task1中加入了又一个task2,task是发送到executor中执行的, 那么已经在executor中的task1,没有办法给task2提供正常的driver端资源调度,以及算子的执行

所以只能是在算子外把另一个RDD转成array,再放进去就没有问题

val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10))
val arr2: Array[Int]=RDD2.collect()

val fil_RDD: RDD[Int] = RDD1.filter(i => {
    arr2.contains(i)
})

fil_RDD.foreach(println)
//输出:无
count

计算RDD里面元素的个数

println(RDD4.count())
//输出:5
reduce

传入一个聚合函数,对RDD的全部数据做聚合

println(RDD4.reduce((i, j) => i + j))
take

传入一个Int类型的值,从RDD中取多少条数据返回并构建Array

val arr2 = RDD4.take(2)
arr2.foreach(println)
//1,2
fliter

与scala中一样,传入一个函数,返回值是Boolean

val RDD5: RDD[Int] = RDD4.filter(i => {
      i > 2
    })
RDD5.foreach(println)
//输出:3,4,5
foreachPartition

这是一个行为算子: 需要接收一个函数f:参数类型是Iterator类型,返回值类型为Unit * 会将每个分区的数据传给Iterator并进行最终的处理,一般用于将结果数据保存到外部系统

读取我的本地文件,向数据库中写入数据,我们之前对rdd的 *** 作都是有返回值的,但是现在向数据库写入数据,这是不需要返回值的

 

import java.sql.{DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ForeachPartitionDemo {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
    conf.setAppName("ForeachPartitionDemo")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    
    //设置分区为4,这个可以忽略
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt",4)

    val conn = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456")

    val ps: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)")

    linesRDD.foreach(line=>{
      val splits = line.split(",")
      val id = splits(0).toInt
      val name = splits(1)
      val age = splits(2).toInt
      val gender = splits(3)
      val clazz = splits(4)
      ps.setInt(1,id)
      ps.setString(2,name)
      ps.setInt(3,age)
      ps.setString(4,gender)
      ps.setString(5,clazz)
      ps.execute()
      ps.close()
      conn.close()
    })
  }
}


 我写入的数据是1000行,写一行数据就连接一次数据库再关闭所以上面,这样的 *** 作是不是不大合适

所以需要通过一些方法减少连接次数,我们上面的mapParttions,这个算子还有返回值,但是我们是往数据库里写数据,不需要什么返回值,这时候就用到了foreachPartition

linesRDD.foreachPartition(iter => {
      // 连接是不能被序列化的,所以连接的建立需要放入算子内部
      // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题
      // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接
      // 大大降低连接的创建次数 提高性能
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456")
      val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)")

      // 这里的foreach方法不是RDD的算子了,这里是Iterator的foreach方法
      // 不会出现连接未被序列化的问题,当前处理的分区的数据都会共用一个连接
      iter.foreach(line => {
        val splits: Array[String] = line.split(",")
        val id: Int = splits(0).toInt
        val name: String = splits(1)
        val age: Int = splits(2).toInt
        val gender: String = splits(3)
        val clazz: String = splits(4)
        st.setInt(1, id)
        st.setString(2, name)
        st.setInt(3, age)
        st.setString(4, gender)
        st.setString(5, clazz)
        //        st.execute() // 相当于每条数据插入一次 性能也比较低
        st.addBatch()
      })
      st.executeBatch() // 采用批量插入的方式
      st.close()
      conn.close()
   })

最终数据写入了数据库 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688012.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存