spark常用算子包括两大类:
- 转换算子:由一个RDD变成另一个RDD,是RDD之间的转换,是懒执行的,需要action算子触发执行
- 行为算子:由一个RDD调用,但最后没有返回新的RDD,而是返回了其他数据类型,行为算子可以触发任务的执行,每个action算子都会触发一个job
package com.xiaoming import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo2WordCount { def main(args: Array[String]): Unit = { //TODO 建立和Spark框架的连接 //JDBC:Connection val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc: SparkContext = new SparkContext(conf) //TODO 执行业务 *** 作 // 1、读取文件 // TODO 获取一行一行的数据 val lines: RDD[String] = sc.textFile("spark/data/words") // 将一行一行是的数据拆分,形成一个一个的单词 // 扁平化 val words: RDD[String] = lines.flatMap(_.split(",")) // 将数据根据单词进行分组,便于统计 val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word) // 对分组后的数据进行转换 val wordCount: RDD[String] = wordGroup.map(kv => kv._1 + "," + kv._2.size) // // 打印 // val arr: Array[String] = wordCount.collect() // arr.foreach(println) val cf: Configuration = new Configuration() val fs: FileSystem = FileSystem.get(cf) val path: Path = new Path("spark/data/wordCnt") // 判断输出路径是否存在 存在则删除 if (fs.exists(path)) { fs.delete(path, true) } // 将结果保存到文件 wordCount .saveAsTextFile("spark/data/wordCnt") //为了更好的在yarn上看到效果,加上死循环 while (true) { } //TODO 关闭连接 // sc.stop() } }2. Map和FlatMap
* map算子:转换算子, 是懒执行的 * 需要接收一个函数f:参数为RDD中的泛型,返回值类型自定 * 会将每一条数据依次传递个函数f进行转换 * 最终整个map方法完成后会返回一个新的RDD * flatMap算子:转换算子 * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是数据容器(集合、数组、序列、迭代器) * 会将每一条数据依次传递个函数f进行转换,还会将函数f返回的数据容器进行扁平化处理(展开) * 最终整个flatMap方法完成后会返回一个新的RDD
package com.xiaoming import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo3Map { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("demoMap").setMaster("local") val sc: SparkContext = new SparkContext(conf) val listRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) println("map之前") val mapRDD: RDD[Int] = listRDD.map(i => { println("i=" + i) i * 20 }) println("map之后") mapRDD.foreach(println) } }
package com.xiaoming import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo3flatMap { def main(args: Array[String]): Unit = { //构建spark上下文环境 val conf: SparkConf = new SparkConf().setMaster("local").setAppName("demo3flatMap") val sc: SparkContext = new SparkContext(conf) val listRDD: RDD[String] = sc.parallelize(List("java,java,scala,python", "hadoop,hive,hbase", "spark,flink,MapReduce")) listRDD.foreach(println) val wordsRDD: RDD[String] = listRDD.flatMap(line=>line.split(",")) wordsRDD.foreach(println) } }
两次打印对比效果:
* mapPartitions:转换算子 * 对每一个分区的数据进行处理 * 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了获取数据)的情况 * 通过mapPartitions这种方式可以减少连接创建的次数,提高运行效率
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object demo4mapPartitions { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo4MapPartitions") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("spark/data/word") // 对每一个分区的数据进行处理 // 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了获取数据)的情况 // 通过mapPartitions这种方式可以减少连接创建的次数,提高运行效率 lineRDD.mapPartitions((iter:Iterator[String])=>{ //打印当前分区 println("mapPartitions") iter.flatMap(line=>line.split(",")) }).foreach(println) lineRDD.mapPartitionsWithIndex((index,iter)=>{ //打印当前分区 println("当前分区索引:"+ index) iter.flatMap(line=>line.split(",")) }).foreach(println) // 可以发现mappartitions会对每一条数据进行处理,假设有N条 // 相对于map来说,即如果需要在map中请求MySQL的数据,那么就会与MySQL建立N次连接 // 这样就导致效率较低,甚至导致MySQL建立的连接次数到达上限出现性能问题 // map lineRDD.map(line=>{ println("map") line.split(",") }).foreach(println) } }
结果对比图:
* foreach、foreachPartition都是行为算子 * * foreach * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型为Unit * 会将每一条数据依次传递个函数f进行最终的一个处理,一般用于输出打印(测试) * * foreachPartition * 需要接收一个函数f:参数类型是Iterator类型,返回值类型为Unit * 会将每个分区的数据传给Iterator并进行最终的处理,一般用于将结果数据保存到外部系统
package com.xiaoming import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo5foreachPartition { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo5ForeachPartitions") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取数据 val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") println(linesRDD.getNumPartitions) // getNumPartitions不是算子,相当于是RDD的一个属性 // 2.建立JDBC链接 // 可以适用foreachPartition代替foreach完成对MySQL数据的插入 // 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了写入数据)的情况 linesRDD.foreachPartition(iter => { // 连接是不能被序列化的,所以连接的建立需要放入算子内部 // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题 // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接 // 大大降低连接的创建次数 提高性能 val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456") val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") iter.foreach(line => { val splits: Array[String] = line.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4) //插入数据 st.setInt(1,id) st.setString(2,name) st.setInt(3, age) st.setString(4, gender) st.setString(5, clazz) // st.execute() 相当于每条数据插入一次 性能也比较低 st.addBatch() }) st.executeBatch()//批量插入 st.close() conn.close() }) } }5. Filter
* filter:转换算子 * 需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是Boolean类型 * 会根据函数f的返回值对数据进行过滤 * 如果返回true则保留数据,返回false则将数据过滤
package com.xiaoming import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo6filter { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo6Filter") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //1. 读取数据 val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //设置筛选条件: 筛选出理科一班的学生 val filterRDD: RDD[String] = linesRDD.filter(line =>line.split(",")(4).startsWith("理科一班")) filterRDD.foreach(println) } }6. sample
* sample:转换算子 * withReplacement:有无放回 * fraction:抽样比例(最终抽样出来的数据量大致等于抽样比例)
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object demo7Sample { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo7Sample") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students数据 val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") // 2、抽取0.01比例学生 val sampleRDD: RDD[String] = linesRDD.sample(false, 0.01) sampleRDD.foreach(println) } }7. GroupByKey
* groupBy:转换算子 * 需要指定按什么进行排名
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo8GroupBy { def main(args: Array[String]): Unit = { // 统计班级人数 // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo8GroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students数据 val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") // 2、将数据构建成 (班级,1) val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1)) // 3、按班级分组(简化) val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(_._1) groupRDD.foreach(println) // 4、统计班级人数 groupRDD.map { case (clazz: String, clazzIter: Iterable[(String, Int)]) => { clazz + "," + clazzIter.map(_._2).sum } }.foreach(println) clazzRDD.groupByKey().map(kv => (kv._1, kv._2.sum)).foreach(println) } }
* reduceByKey:转换算子、分区类算子(只能作用在K-V格式的RDD上) * 相比较于groupByKey:性能更高但功能较弱 * 需要接收一个 聚合函数f,一般是 加和、最大值、最小值 * 相当于MR中的combiner * 会在Map进行预聚合 * 只适用于幂等 *** 作 * y=f(x)=f(y)=f(f(x))
package com.xiaoming import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo9ReduceKey { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo9ReduceKey").setMaster("local") val sc: SparkContext = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //构建K-V格式(班级,1) val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1)) //1.groupByKey求班级人数 clazzRDD.groupByKey.map(kv => (kv._1, kv._2.sum)).foreach(println) //2.reduceKey求班级人数 clazzRDD.reduceByKey(_+_).foreach(println) while (true) { } } }
groupByKey与reduceByKey的区别:
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo10Join { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo10Join") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") val scoRDD: RDD[String] = sc.textFile("spark/data/stu/score.txt") // 2、将数据变成K-V格式 // Key:关联的字段 id Value:将自身作为value val stuKVRDD: RDD[(String, String)] = stuRDD.map(line => (line.split(",")(0), line.replace(",", "|"))) val scoKVRDD: RDD[(String, String)] = scoRDD.map(line => (line.split(",")(0), line.replace(",", "|"))) // join只能作用在 K-V格式的RDD上,会按照K进行关联 // join等同于inner join val joinRDD: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD) //(1500100488,(1500100488|丁鸿骞|22|男|理科四班,1500100488|1000001|120)) joinRDD.map({ //模式匹配 case (id:String,(stu:String,sco:String))=> val stusplits: Array[String] = stu.split("\|") val name: String = stusplits(1) val age: String = stusplits(2) val gender: String = stusplits(3) val clazz: String = stusplits(4) val scosplits: Array[String] = sco.split("\|") val sid: String = scosplits(1) val score: String = scosplits(2) s"$id,$name,$age,$gender,$clazz,$sid,$score" }).foreach(println) // joinRDD.map(kv => { val id: String = kv._1 val stuScoT2: (String, String) = kv._2 val stu: String = stuScoT2._1 val sco: String = stuScoT2._2 val stuSplits: Array[String] = stu.split("\|") val name: String = stuSplits(1) val clazz: String = stuSplits(4) val scoSplits: Array[String] = sco.split("\|") val sid: String = scoSplits(1) val score: String = scoSplits(2) s"$id,$name,$clazz,$sid,$score" }).foreach(println) } }10. Union
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo11Union { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo11Union") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 通过集合创建RDD val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6)) val rdd2: RDD[Int] = sc.parallelize(List(7, 8, 9)) // 两个RDD union必须格式一样 rdd1.union(rdd2).foreach(println) } }11. MapValues
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo12MapValues { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo12MapValues") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3))) // 只能作用在K-V格式的RDD上,相当于对values进行遍历 rdd.mapValues(i => i * i).foreach(println) } }12. Sort
package com.xiaoming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo13Sort { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo13Sort") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") // 2、按年龄排序 倒序 stuRDD.sortBy(line => line.split(",")(2), ascending = false).foreach(println) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)