spark之RDD的转换算子与行为算子的具体使用

spark之RDD的转换算子与行为算子的具体使用,第1张

spark之RDD的转换算子与行为算子的具体使用

文章目录
  • 1、Transform算子
    • 1.1 map
    • 1.2 flatmap
    • 1.3 groupBy和groupBykey
    • 1.4 filter
    • 1.5 Mappartitions
    • 1.6 mapValues
    • 1.7 sort
    • 1.8 simple
    • 1.9 union
  • 2、 Actions算子
    • 2.1 count,collect,reduce,save,lookup
    • 2.2 foreach 和 foreachPartition

1、Transform算子 1.1 map
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDMap {
  def main(args: Array[String]): Unit = {
    
    //创建spark的上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDdemo1")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    
    //textFile是读取文件的RDD形式,parallelize是创建一个list集合的方式
    
    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

    
    //println("map之前")
    val mapRDD: RDD[Int] = listRDD.map(i => {
      println("i的值" + i)
      i * 20
    })

    //println("map之后")
//    mapRDD.foreach(println)
//    listRDD.foreach(println)
    val JiShuRDD: RDD[Int] = listRDD.filter(i => {
      var flag: Boolean = false
      if (i % 2 == 1) {
        flag = true
      }
      flag
    })
   JiShuRDD.foreach(println)

    while (true){

    }
    
  }
  
}

1.2 flatmap
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable


object SparkRDDFlatmap {
  def main(args: Array[String]): Unit = {
    
    //创建上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDFlatmap")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val lineRDD: RDD[String] = sc.parallelize(List("java,java,scala,python","hadoop,hive,hbase","spark,filk,MapReduce"))
    val splitsRDD: RDD[String] = lineRDD.flatMap(word => {
      word.split(",")
    })
    val groupByRDD: RDD[(String, Iterable[String])] = splitsRDD.groupBy(word=>word)

    val wordcountRDD : RDD[(String, Int)] = groupByRDD.map(kv => {
      val key: String = kv._1
      val value: Iterable[String] = kv._2
      val size: Int = value.size
      (key, size)
    })

    wordcountRDD.foreach(println)
    groupByRDD.foreach(println)
    splitsRDD.foreach(println)
    lineRDD.foreach(println)
  }

}

1.3 groupBy和groupBykey
package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD


object SparkRDDGroupBy {
  def main(args: Array[String]): Unit = {
    

    //创建spark上下文的环境
    val conf: SparkConf = new SparkConf()

    conf.setAppName("SparkRDDGroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    //读取students数据
    val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    //统计班级人数
    val clazzRDD: RDD[(String, Int)] = lineRDD.map(line => (line.split(",")(4), 1))
    //clazzRDD.foreach(println)
    //按班级分组
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(kv => kv._1)
   // groupRDD.foreach(println)
    //统计班级人数
    val sum_clazzRDD: RDD[(String, Int)] = groupRDD.map {
      case (key: String, iter: Iterable[(String, Int)]) => {
        val clazz_sum: Int = iter.map(lin => lin._2).sum
        (key, clazz_sum)

      }
    }
    //sum_clazzRDD.foreach(println)

    

    val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzRDD.groupByKey()
    groupByKeyRDD.map(kv=>(kv._1,kv._2.sum)).foreach(println)

  }

}

1.4 filter
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDFilter{
  def main(args: Array[String]): Unit = {
    
    //创建Spark上下环境
    val conf: SparkConf = new SparkConf()

    conf.setAppName("SparkRDDFilter")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    //读取students数据
    val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    //过滤除理科班学生
    lineRDD.filter(line=>{
      val splits: Array[String] = line.split(",")
      //startsWith是字符串中以某某为前缀的方法
      splits(4).startsWith("理科")
    }).foreach(println)
  }
}

1.5 Mappartitions
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDMappartitions {
  def main(args: Array[String]): Unit = {
    

    //创建上下文的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDMappartitions")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    val lineRDD: RDD[String] = sc.textFile("spark/data/words")
    //对每一个分区的数据进行处理,这里有三份文件,既有三个分区,每一个分区至少对应一个task
    //适用于在算子内部需要跟外部数据源建立连接的情况
    //通过mapPartitions这种方式可以减少连接创建的次数,顺便提高运行效率
    
    lineRDD.mapPartitions((iter: Iterator[String]) => {
      println("map partitions") //打印三次
      //迭代器也有map等方法
      iter.flatMap(line => {
        line.split(",")
      })
    }).foreach(println)

    //对每一条数据进行处理,假设有N条数据
    //如果需要在map中例如去请求mysql的数据(一般创建连接是为了获取数据),那么会与mysql建立N次连接
    //会导致运行效率较低,甚至会导致mysql建立的连接数达到上限,出现性能问题
    lineRDD.map(line => {
      println("map")
      val strings: Array[String] = line.split(",")
      strings
    }).foreach(println)
    lineRDD.mapPartitionsWithIndex((index,iter)=>{
      println("当前的分区索引:"+index)
      iter.flatMap(line=>line.split(",0"))

    }).foreach(println)
  }

}

1.6 mapValues
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDMapvalues {
  def main(args: Array[String]): Unit = {
    //创建spark上下文的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDGroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

//只能作用在k-v格式的RDD上,相当于对values进行遍历
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三",1),("李四",2),("王五",3)))
    rdd.mapValues(i=>i*i).foreach(println)
  }

}

1.7 sort
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDsort {
  def main(args: Array[String]): Unit = {
    //创建spark上下文的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDGroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    //按照年龄排序,倒序
    //ascending 默认升序排序
    stuRDD.sortBy(stu=>stu.split(",")(2),ascending = false)
      .foreach(println)

  }

}

1.8 simple
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDSample {
  def main(args: Array[String]): Unit = {
    

    //创建spark上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDSimple")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val sampleRDD: RDD[String] = lineRDD.sample(false, 0.2)
    sampleRDD.foreach(println)
  }
}

1.9 union
package com.shujia.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDUnion {
  def main(args: Array[String]): Unit = {
    //创建spark上下文的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDGroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    //通过集合创建RDD
    
      //两个RDD union格式必须一致
    val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6))
    val rdd2: RDD[Int] = sc.parallelize(List(4,5,6,7,8,9))
    rdd1.union((rdd2)).foreach(println)
  }

}

2、 Actions算子 2.1 count,collect,reduce,save,lookup
package com.shujia.core

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD


object SparkRDDAction {
  def main(args: Array[String]): Unit = {
    //创建spark上下文的环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDGroupBy")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    //读取students、scores数据
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    //foreach 没有返回值,会触发job
    //需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit
    stuRDD.foreach(println)

    
    println(stuRDD.count())

    
    
    val stuArr: Array[String] = stuRDD.collect()
    val blackListRDD: RDD[String] = sc.parallelize(List("1500100001","1500100007","1500100009"))
    //我们可以在算子外部先调用collect方法然后再算子内部调用
    val ListRDD: Array[String] = blackListRDD.collect()
    stuRDD.filter(stu=>{
      ListRDD.contains(stu.split(",")(0))
    }).foreach(println)

    
    //传入一个聚合函数
    //select sum(age) from students group by 1
    //全局的聚合(将所有的数据作为一个组进行聚合)
    stuRDD.map(line=>line.split(",")(2))
      .reduce((i,j)=>i+j)
    .foreach(println)
    
    stuRDD.saveAsTextFile("")
    
    val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0)))
      .lookup("宣谷芹")
    println(ids)
  }
}


2.2 foreach 和 foreachPartition
package com.shujia.core

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkRDDForeach {
  def main(args: Array[String]): Unit = {
    
    //创建上下文环境
    val conf: SparkConf = new SparkConf()
    conf.setAppName("SparkRDDForeach")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    //读取数据,设置了是个分区
    val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt", 4)
    println(lineRDD.getNumPartitions)

    //创建mysql连接

    //遍历每一条数据
    //因为不需要返回值,所以选择foreach行为算子遍历
    
    //    lineRDD.foreach(line=>{
    //      //连接是不能被序列化的,所以连接的建立需要放入算子内部
    //      //foreach是针对每一条数据处理一次,相当于这里会创建1000次连接,会造成性能问题
    //      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456")
    //      val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")
    //      val splits: Array[String] = line.split(",")
    //      val id: Int = splits(0).toInt
    //      val name: String = splits(1)
    //      val age: Int = splits(2).toInt
    //      val gender: String = splits(3)
    //      val clazz: String = splits(4)
    //      ps.setInt(1,id)
    //      ps.setString(2,name)
    //      ps.setInt(3,age)
    //      ps.setString(4,gender)
    //      ps.setString(5,clazz)
    //      ps.execute()
    //      ps.close()
    //      conn.close()
    //    })

    

    lineRDD.foreachPartition(iter => {
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
      val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)")
      //这里的foreach方法实际上不是RDD的算子,这里是Iterator的foreach方法
      //不会出现连接未被序列化的问题,当前处理的分区数据都会共用一个连接
      iter.foreach(line => {
        val splits: Array[String] = line.split(",")
        val id: Int = splits(0).toInt
        val name: String = splits(1)
        val age: Int = splits(2).toInt
        val gender: String = splits(3)
        val clazz: String = splits(4)
        ps.setInt(1, id)
        ps.setString(2, name)
        ps.setInt(3, age)
        ps.setString(4, gender)
        ps.setString(5, clazz)
        //相当于每条数据插入一次,性能也比较低
        //ps.execute()
        ps.addBatch()
      })
      //采用批量插入的方式
      ps.executeBatch()
      ps.close()
      conn.close()
    })
  }

}

说明:代码中所涉及到的数据,可以联系本人获取

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700054.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存