- 1、Transform算子
- 1.1 map
- 1.2 flatmap
- 1.3 groupBy和groupBykey
- 1.4 filter
- 1.5 Mappartitions
- 1.6 mapValues
- 1.7 sort
- 1.8 simple
- 1.9 union
- 2、 Actions算子
- 2.1 count,collect,reduce,save,lookup
- 2.2 foreach 和 foreachPartition
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDMap { def main(args: Array[String]): Unit = { //创建spark的上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDdemo1") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //textFile是读取文件的RDD形式,parallelize是创建一个list集合的方式 val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) //println("map之前") val mapRDD: RDD[Int] = listRDD.map(i => { println("i的值" + i) i * 20 }) //println("map之后") // mapRDD.foreach(println) // listRDD.foreach(println) val JiShuRDD: RDD[Int] = listRDD.filter(i => { var flag: Boolean = false if (i % 2 == 1) { flag = true } flag }) JiShuRDD.foreach(println) while (true){ } } }1.2 flatmap
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object SparkRDDFlatmap { def main(args: Array[String]): Unit = { //创建上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDFlatmap") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val lineRDD: RDD[String] = sc.parallelize(List("java,java,scala,python","hadoop,hive,hbase","spark,filk,MapReduce")) val splitsRDD: RDD[String] = lineRDD.flatMap(word => { word.split(",") }) val groupByRDD: RDD[(String, Iterable[String])] = splitsRDD.groupBy(word=>word) val wordcountRDD : RDD[(String, Int)] = groupByRDD.map(kv => { val key: String = kv._1 val value: Iterable[String] = kv._2 val size: Int = value.size (key, size) }) wordcountRDD.foreach(println) groupByRDD.foreach(println) splitsRDD.foreach(println) lineRDD.foreach(println) } }1.3 groupBy和groupBykey
package com.shujia.core import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SparkRDDGroupBy { def main(args: Array[String]): Unit = { //创建spark上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDGroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取students数据 val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //统计班级人数 val clazzRDD: RDD[(String, Int)] = lineRDD.map(line => (line.split(",")(4), 1)) //clazzRDD.foreach(println) //按班级分组 val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(kv => kv._1) // groupRDD.foreach(println) //统计班级人数 val sum_clazzRDD: RDD[(String, Int)] = groupRDD.map { case (key: String, iter: Iterable[(String, Int)]) => { val clazz_sum: Int = iter.map(lin => lin._2).sum (key, clazz_sum) } } //sum_clazzRDD.foreach(println) val groupByKeyRDD: RDD[(String, Iterable[Int])] = clazzRDD.groupByKey() groupByKeyRDD.map(kv=>(kv._1,kv._2.sum)).foreach(println) } }1.4 filter
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDFilter{ def main(args: Array[String]): Unit = { //创建Spark上下环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDFilter") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取students数据 val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //过滤除理科班学生 lineRDD.filter(line=>{ val splits: Array[String] = line.split(",") //startsWith是字符串中以某某为前缀的方法 splits(4).startsWith("理科") }).foreach(println) } }1.5 Mappartitions
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDMappartitions { def main(args: Array[String]): Unit = { //创建上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDMappartitions") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("spark/data/words") //对每一个分区的数据进行处理,这里有三份文件,既有三个分区,每一个分区至少对应一个task //适用于在算子内部需要跟外部数据源建立连接的情况 //通过mapPartitions这种方式可以减少连接创建的次数,顺便提高运行效率 lineRDD.mapPartitions((iter: Iterator[String]) => { println("map partitions") //打印三次 //迭代器也有map等方法 iter.flatMap(line => { line.split(",") }) }).foreach(println) //对每一条数据进行处理,假设有N条数据 //如果需要在map中例如去请求mysql的数据(一般创建连接是为了获取数据),那么会与mysql建立N次连接 //会导致运行效率较低,甚至会导致mysql建立的连接数达到上限,出现性能问题 lineRDD.map(line => { println("map") val strings: Array[String] = line.split(",") strings }).foreach(println) lineRDD.mapPartitionsWithIndex((index,iter)=>{ println("当前的分区索引:"+index) iter.flatMap(line=>line.split(",0")) }).foreach(println) } }1.6 mapValues
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDMapvalues { def main(args: Array[String]): Unit = { //创建spark上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDGroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //只能作用在k-v格式的RDD上,相当于对values进行遍历 val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三",1),("李四",2),("王五",3))) rdd.mapValues(i=>i*i).foreach(println) } }1.7 sort
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDsort { def main(args: Array[String]): Unit = { //创建spark上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDGroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //按照年龄排序,倒序 //ascending 默认升序排序 stuRDD.sortBy(stu=>stu.split(",")(2),ascending = false) .foreach(println) } }1.8 simple
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDSample { def main(args: Array[String]): Unit = { //创建spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDSimple") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") val sampleRDD: RDD[String] = lineRDD.sample(false, 0.2) sampleRDD.foreach(println) } }1.9 union
package com.shujia.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDUnion { def main(args: Array[String]): Unit = { //创建spark上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDGroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //通过集合创建RDD //两个RDD union格式必须一致 val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6)) val rdd2: RDD[Int] = sc.parallelize(List(4,5,6,7,8,9)) rdd1.union((rdd2)).foreach(println) } }2、 Actions算子 2.1 count,collect,reduce,save,lookup
package com.shujia.core import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SparkRDDAction { def main(args: Array[String]): Unit = { //创建spark上下文的环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDGroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt") //foreach 没有返回值,会触发job //需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit stuRDD.foreach(println) println(stuRDD.count()) val stuArr: Array[String] = stuRDD.collect() val blackListRDD: RDD[String] = sc.parallelize(List("1500100001","1500100007","1500100009")) //我们可以在算子外部先调用collect方法然后再算子内部调用 val ListRDD: Array[String] = blackListRDD.collect() stuRDD.filter(stu=>{ ListRDD.contains(stu.split(",")(0)) }).foreach(println) //传入一个聚合函数 //select sum(age) from students group by 1 //全局的聚合(将所有的数据作为一个组进行聚合) stuRDD.map(line=>line.split(",")(2)) .reduce((i,j)=>i+j) .foreach(println) stuRDD.saveAsTextFile("") val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0))) .lookup("宣谷芹") println(ids) } }2.2 foreach 和 foreachPartition
package com.shujia.core import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkRDDForeach { def main(args: Array[String]): Unit = { //创建上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("SparkRDDForeach") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取数据,设置了是个分区 val lineRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt", 4) println(lineRDD.getNumPartitions) //创建mysql连接 //遍历每一条数据 //因为不需要返回值,所以选择foreach行为算子遍历 // lineRDD.foreach(line=>{ // //连接是不能被序列化的,所以连接的建立需要放入算子内部 // //foreach是针对每一条数据处理一次,相当于这里会创建1000次连接,会造成性能问题 // val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456") // val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)") // val splits: Array[String] = line.split(",") // val id: Int = splits(0).toInt // val name: String = splits(1) // val age: Int = splits(2).toInt // val gender: String = splits(3) // val clazz: String = splits(4) // ps.setInt(1,id) // ps.setString(2,name) // ps.setInt(3,age) // ps.setString(4,gender) // ps.setString(5,clazz) // ps.execute() // ps.close() // conn.close() // }) lineRDD.foreachPartition(iter => { val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456") val ps: PreparedStatement = conn.prepareStatement("insert into student2 values(?,?,?,?,?)") //这里的foreach方法实际上不是RDD的算子,这里是Iterator的foreach方法 //不会出现连接未被序列化的问题,当前处理的分区数据都会共用一个连接 iter.foreach(line => { val splits: Array[String] = line.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4) ps.setInt(1, id) ps.setString(2, name) ps.setInt(3, age) ps.setString(4, gender) ps.setString(5, clazz) //相当于每条数据插入一次,性能也比较低 //ps.execute() ps.addBatch() }) //采用批量插入的方式 ps.executeBatch() ps.close() conn.close() }) } }
说明:代码中所涉及到的数据,可以联系本人获取
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)