Spark常用算子之转换算子

Spark常用算子之转换算子,第1张

Spark常用算子之转换算子 Spark常用算子(一)转换算子

spark常用算子包括两大类:

  • 转换算子:由一个RDD变成另一个RDD,是RDD之间的转换,是懒执行的,需要action算子触发执行
  • 行为算子:由一个RDD调用,但最后没有返回新的RDD,而是返回了其他数据类型,行为算子可以触发任务的执行,每个action算子都会触发一个job
1. WordCount
package com.xiaoming

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo2WordCount {
  def main(args: Array[String]): Unit = {
    //TODO 建立和Spark框架的连接
    //JDBC:Connection
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc: SparkContext = new SparkContext(conf)

    //TODO 执行业务 *** 作
    // 1、读取文件
    
    // TODO 获取一行一行的数据
    val lines: RDD[String] = sc.textFile("spark/data/words")
    // 将一行一行是的数据拆分,形成一个一个的单词
    // 扁平化
    val words: RDD[String] = lines.flatMap(_.split(","))
    // 将数据根据单词进行分组,便于统计
    val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)

    // 对分组后的数据进行转换
    val wordCount: RDD[String] = wordGroup.map(kv => kv._1 + "," + kv._2.size)

//    // 打印
//    val arr: Array[String] = wordCount.collect()
//    arr.foreach(println)

    val cf: Configuration = new Configuration()

    val fs: FileSystem = FileSystem.get(cf)
    val path: Path = new Path("spark/data/wordCnt")
    // 判断输出路径是否存在 存在则删除
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    // 将结果保存到文件
    wordCount
      .saveAsTextFile("spark/data/wordCnt")
    //为了更好的在yarn上看到效果,加上死循环
    while (true) {
    }
    //TODO 关闭连接
//    sc.stop()
  }
}
2. Map和FlatMap
  * map算子:转换算子, 是懒执行的
  * 需要接收一个函数f:参数为RDD中的泛型,返回值类型自定
  * 会将每一条数据依次传递个函数f进行转换
  * 最终整个map方法完成后会返回一个新的RDD
  
  * flatMap算子:转换算子
  * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是数据容器(集合、数组、序列、迭代器)
  * 会将每一条数据依次传递个函数f进行转换,还会将函数f返回的数据容器进行扁平化处理(展开)
  * 最终整个flatMap方法完成后会返回一个新的RDD
package com.xiaoming
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo3Map {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("demoMap").setMaster("local")

    val sc: SparkContext = new SparkContext(conf)
    val listRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9))

    println("map之前")
    val mapRDD: RDD[Int] = listRDD.map(i => {
      println("i=" + i)
      i * 20
    })
    println("map之后")
    mapRDD.foreach(println)
  }
}
package com.xiaoming

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo3flatMap {
  def main(args: Array[String]): Unit = {
    //构建spark上下文环境
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("demo3flatMap")

    val sc: SparkContext = new SparkContext(conf)

    val listRDD: RDD[String] = sc.parallelize(List("java,java,scala,python", "hadoop,hive,hbase", "spark,flink,MapReduce"))

    listRDD.foreach(println)

    val wordsRDD: RDD[String] = listRDD.flatMap(line=>line.split(","))

    wordsRDD.foreach(println)

  }
}

两次打印对比效果:

3. mapPartitions
 * mapPartitions:转换算子
 * 对每一个分区的数据进行处理
 * 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了获取数据)的情况
 * 通过mapPartitions这种方式可以减少连接创建的次数,提高运行效率
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object demo4mapPartitions {
  def main(args: Array[String]): Unit = {
    
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo4MapPartitions")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("spark/data/word")
    // 对每一个分区的数据进行处理
    // 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了获取数据)的情况
    // 通过mapPartitions这种方式可以减少连接创建的次数,提高运行效率
    lineRDD.mapPartitions((iter:Iterator[String])=>{
      //打印当前分区
      println("mapPartitions")
      iter.flatMap(line=>line.split(","))
    }).foreach(println)

    lineRDD.mapPartitionsWithIndex((index,iter)=>{
      //打印当前分区
      println("当前分区索引:"+ index)
      iter.flatMap(line=>line.split(","))
    }).foreach(println)

    // 可以发现mappartitions会对每一条数据进行处理,假设有N条
    // 相对于map来说,即如果需要在map中请求MySQL的数据,那么就会与MySQL建立N次连接
    // 这样就导致效率较低,甚至导致MySQL建立的连接次数到达上限出现性能问题

    // map
    lineRDD.map(line=>{
      println("map")
      line.split(",")
    }).foreach(println)
  }
}

结果对比图:

4. foreachPartition
  * foreach、foreachPartition都是行为算子
  *
  * foreach
  * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型为Unit
  * 会将每一条数据依次传递个函数f进行最终的一个处理,一般用于输出打印(测试)
  *
  * foreachPartition
  * 需要接收一个函数f:参数类型是Iterator类型,返回值类型为Unit
  * 会将每个分区的数据传给Iterator并进行最终的处理,一般用于将结果数据保存到外部系统
package com.xiaoming

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo5foreachPartition {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo5ForeachPartitions")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    //读取数据
    val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    println(linesRDD.getNumPartitions)
    // getNumPartitions不是算子,相当于是RDD的一个属性

    // 2.建立JDBC链接
    

    // 可以适用foreachPartition代替foreach完成对MySQL数据的插入
    // 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了写入数据)的情况
    linesRDD.foreachPartition(iter => {
      // 连接是不能被序列化的,所以连接的建立需要放入算子内部
      // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题
      // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接
      // 大大降低连接的创建次数 提高性能
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456")
      val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)")
      iter.foreach(line => {
        val splits: Array[String] = line.split(",")
        val id: Int = splits(0).toInt
        val name: String = splits(1)
        val age: Int = splits(2).toInt
        val gender: String = splits(3)
        val clazz: String = splits(4)
        //插入数据
        st.setInt(1,id)
        st.setString(2,name)
        st.setInt(3, age)
        st.setString(4, gender)
        st.setString(5, clazz)
        // st.execute() 相当于每条数据插入一次 性能也比较低
        st.addBatch()

      })
      st.executeBatch()//批量插入
      st.close()
      conn.close()
    })
  }
}

5. Filter
  * filter:转换算子
  * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是Boolean类型
  * 会根据函数f的返回值对数据进行过滤
  * 如果返回true则保留数据,返回false则将数据过滤
package com.xiaoming

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo6filter {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo6Filter")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    //1. 读取数据
    val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    //设置筛选条件: 筛选出理科一班的学生
    val filterRDD: RDD[String] = linesRDD.filter(line =>line.split(",")(4).startsWith("理科一班"))
    filterRDD.foreach(println)
  }
}

6. sample
  * sample:转换算子
  * withReplacement:有无放回
  * fraction:抽样比例(最终抽样出来的数据量大致等于抽样比例)
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object demo7Sample {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo7Sample")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    // 1、读取students数据
    val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
	// 2、抽取0.01比例学生 
    val sampleRDD: RDD[String] = linesRDD.sample(false, 0.01)
    sampleRDD.foreach(println)

  }
}

7. GroupByKey
  * groupBy:转换算子
  * 需要指定按什么进行排名
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo8GroupBy {
  def main(args: Array[String]): Unit = {
    // 统计班级人数
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo8GroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    // 1、读取students数据
    val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    
    // 2、将数据构建成 (班级,1)
    val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1))

    // 3、按班级分组(简化)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(_._1)
    groupRDD.foreach(println)
    
    // 4、统计班级人数
    groupRDD.map {
      case (clazz: String, clazzIter: Iterable[(String, Int)]) => {
        clazz + "," + clazzIter.map(_._2).sum
      }
    }.foreach(println)

    
    
    clazzRDD.groupByKey().map(kv => (kv._1, kv._2.sum)).foreach(println)
  }
}



8. ReduceByKey
* reduceByKey:转换算子、分区类算子(只能作用在K-V格式的RDD上)
* 相比较于groupByKey:性能更高但功能较弱
* 需要接收一个 聚合函数f,一般是 加和、最大值、最小值
* 相当于MR中的combiner
* 会在Map进行预聚合
* 只适用于幂等 *** 作
* y=f(x)=f(y)=f(f(x))
package com.xiaoming

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo9ReduceKey {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo9ReduceKey").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    //构建K-V格式(班级,1)
    val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1))

    //1.groupByKey求班级人数
    clazzRDD.groupByKey.map(kv => (kv._1, kv._2.sum)).foreach(println)
    //2.reduceKey求班级人数

    clazzRDD.reduceByKey(_+_).foreach(println)

    while (true) {

    }
  }
}



groupByKey与reduceByKey的区别:

9. Join
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo10Join {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo10Join")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    // 1、读取students、scores数据
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val scoRDD: RDD[String] = sc.textFile("spark/data/stu/score.txt")

    // 2、将数据变成K-V格式
    // Key:关联的字段 id Value:将自身作为value
    val stuKVRDD: RDD[(String, String)] = stuRDD.map(line => (line.split(",")(0), line.replace(",", "|")))
    val scoKVRDD: RDD[(String, String)] = scoRDD.map(line => (line.split(",")(0), line.replace(",", "|")))

    // join只能作用在 K-V格式的RDD上,会按照K进行关联
    // join等同于inner join
    val joinRDD: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)

    //(1500100488,(1500100488|丁鸿骞|22|男|理科四班,1500100488|1000001|120))
    joinRDD.map({
      //模式匹配
      case (id:String,(stu:String,sco:String))=>
        val stusplits: Array[String] = stu.split("\|")
        val name: String = stusplits(1)
        val age: String = stusplits(2)
        val gender: String = stusplits(3)
        val clazz: String = stusplits(4)

        val scosplits: Array[String] = sco.split("\|")
        val sid: String = scosplits(1)
        val score: String = scosplits(2)
        s"$id,$name,$age,$gender,$clazz,$sid,$score"
    }).foreach(println)

    //
    joinRDD.map(kv => {
      val id: String = kv._1
      val stuScoT2: (String, String) = kv._2
      val stu: String = stuScoT2._1
      val sco: String = stuScoT2._2

      val stuSplits: Array[String] = stu.split("\|")
      val name: String = stuSplits(1)
      val clazz: String = stuSplits(4)
      val scoSplits: Array[String] = sco.split("\|")
      val sid: String = scoSplits(1)
      val score: String = scoSplits(2)
      s"$id,$name,$clazz,$sid,$score"
    }).foreach(println)
  }
}

10. Union
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo11Union {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo11Union")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    // 通过集合创建RDD
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6))
    val rdd2: RDD[Int] = sc.parallelize(List(7, 8, 9))

    // 两个RDD union必须格式一样
    rdd1.union(rdd2).foreach(println)
  }
}

11. MapValues
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo12MapValues {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo12MapValues")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3)))

    // 只能作用在K-V格式的RDD上,相当于对values进行遍历
    rdd.mapValues(i => i * i).foreach(println)
  }
}

12. Sort
package com.xiaoming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo13Sort {
  def main(args: Array[String]): Unit = {
    // 构建Spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo13Sort")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    // 1、读取students、scores数据
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    // 2、按年龄排序 倒序
    
    stuRDD.sortBy(line => line.split(",")(2), ascending = false).foreach(println)
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690410.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存