目录
常见转换算子
map
flatmap
mapParttions
fliter
groupByKey
reudceByKey
join
Union
sortBy
常见行为算子
collect
count
reduce
take
fliter
foreachPartition
转换算子:
RDD的转换 *** 作是:一个RDD的经过转换 *** 作后,返回一个新的RDD
转换算子都是懒执行的,你在里面写好了逻辑,单独一个这样是不会运行的,需要 *** 作算子使用了这些RDD才会执行里面的逻辑
行为算子:
行为算子用于执行计算并按指定的方式输出结果。行为算子接受 RDD,但是返回非 RDD,即输出一个值或者结果。在 RDD 执行过程中,真正的计算发生在行为算子 *** 作之前。
spark程序中有一个 *** 作算子,就会生成相同数量的job
懒执行的证明:
写个local模式的spark程序,通过集合创建一个rdd,用转换算子map实现:打印rdd里面的元素,扩大20倍,为了直观感受,写了一个map之前和map之后的标签
import org.apache.spark.{SparkConf, SparkContext} object Demo2RDD { def main(args: Array[String]): Unit = { //本地测试用的连接 val conf = new SparkConf() conf.setAppName("Demo2RDD") conf.setMaster("local") val sc = new SparkContext(conf) //通过集合创建RDD val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6)) println("map之前") listRDD.map(i=>{ println(i) i*20 }) println("map之后") //这里加个死循环可以让我们去spark的页面查看运行状况 while (true){ } } }
输出结果:只有我们打印的2句话,这就是因为转换算子的懒执行,我们将计算逻辑写进了转换算子中,但是他不会执行,只有在行为算子中调用了这个rdd,才会触发转换算子的执行
在上面的代码中加入一个foreach的行为算子,这样就可以看到map执行了我们写进去的计算逻辑
listRDD.map(i=>{ println(i) i*20 }).foreach(println)
最终打印了我们想要的结果
常见转换算子 map与scala中的map方法一样接受一个函数f,返回值类型自定义
val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6)) listRDD.map(_*20).foreach(println) //输出结果:20 40 60 80 100 120flatmap
与scala中的flatMap方法一样,接受一个函数,要求返回值类型是集合类型,会自动会返回的集合进行展开 *** 作
我这里读取了文本数据,返回值是将每行数据通过split方法切分逗号得到的一个数组,符合要求
val linesRdd: RDD[String] = sc.textFile("spark/data/words.txt") linesRdd.flatMap(lines=>lines.split(",")).foreach(println) //输出结果:hello java hello world spark scalamapParttions
这个算子的作用是向里面写入一个函数,参数是迭代器,返回值自定义
object mapPartitionDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("mapPartitionDemo") val sc = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("spark/data/") linesRDD.mapPartitions((iter:Iterator[String])=>{ println("**mapPartition**") iter.flatMap(line=>line.split(",")) }).foreach(println) linesRDD.map(lines=>{ println("**map**") lines.split(",") }).foreach(println) while(true){ } } }
打印的数据没什么好说的,结果看起来都一样是把目标文件夹下面的单词都打印出来了
但是**mapPartition**打印了2次,**map**打印了10次
我的data文件夹下是2个文件,都是几十k的小文件,我们读取数据就会产生2个分区
mapPartition算子做了什么事呢?
他是根据分区读取数据,传入一个分区(迭代器)后,再对这个分区的数据进行处理,我用flatmap方法处理这个迭代器
map算子做了什么呢?
他是将每一行的数据读入,然后切分
在将这个打印的标签换成数据库连接的时候就可以感受到2者的差距在哪里
mapPartitionsWithIndex,和上面的方法没啥区别,就是我们可以多查看一个分区号,从0开始计数
linesRDD.mapPartitionsWithIndex((index,iter)=>{ println(index) iter.flatMap(line=>line.split(",")) }).foreach(println)fliter
filte需要接收一个函数f:参数类型同RDD中的泛型,返回值类型是Boolean类型
会根据函数f的返回值对数据进行过滤,如果返回true则保留数据,返回false则将数据过滤
val filterRDD: RDD[String] = linesRDD .filter(line => line.split(",")(4).startsWith("文科")) filterRDD.foreach(println)
过滤出了文科班的学生
groupByKey和groupByKey相似的一个是groupby算子
groupby和scala中的一样,传入一个自定义函数,参数指定分组的内容,返回k-v格式的数据
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val groupRDD = wordRDD.groupBy(kv => kv._1) groupRDD.foreach(println)
groupByKey:要求传入的数据是k-v格式,根据key直接为我们分好组
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val groupRDD: RDD[(String, Iterable[Int])] = wordRDD.groupByKey() groupRDD.foreach(println)
可以看到相比上面的groupby方法,groupByKey方法只是返回了value
reudceByKeyreduceByKey只能作用在kv格式的数据上
相比较于groupByKey:性能更高但功能较弱
需要接收一个 聚合函数f,一般是 加和、最大值、最小值
相当于MR中的combiner,只能使用等幂 *** 作
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val rb_RDD = wordRDD.reduceByKey((i, j) => { i + j }) rb_RDD.foreach(println)join
join *** 作,我们首先需要准备两个rdd,这2个rdd必须是kv格式,而且key需要相同,key相同的,会把他们的value放入一个元组中作为新的value,key就是连接的key
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.join(rdd1).foreach(println)//输出结果(1,(dd,4)) (2,(bb,5)) (3,(aa,6))
left join/right join
join默认是inner join,有时候可能需要用到left join/right join这种 *** 作
在maysql中,如果id关联上,但是被关联一方的数据为空,是用null填充;在spark中很显然没有这种 *** 作,使用left/right join,被关联方返回的是一个Option类型的数据,如果有数据就是Some类型,没有数据就是None类型
比如我这里写的rdd1里面就没有key是6的,可以看看结果应该是 6,(cc,None)
val rdd: RDD[(Int, String)] = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) val rdd1: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.leftOuterJoin(rdd1).foreach(println)
如果要取出Some里面的数据,用case模式匹配很好做
Union连接相同格式的RDD
val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10)) val union_RDD: RDD[Int] = RDD1.union(RDD2) union_RDD.foreach(println) //输出结果:1 2 3 4 5 6 7 8 9 10
不同格式的,你在写的时候就直接报错了
sortBy与scala中的使用相同,指什么进行排序
val RDD3: RDD[(String,Int)] = sc.parallelize(List(("one",3), ("two",2), ("three",1))) RDD3.sortBy(kv=>kv._2).foreach(println)
可以在sortBy里面加参数,ascending = false这样就是降序排序
常见行为算子 collect将RDD里面的元素取出放在一个数组中
val RDD4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val arr1: Array[Int] = RDD4.collect() arr1.foreach(println) //输出:1,2,3,4,5
说到collect,提一个问题就是:一个RDD中不能直接使用另一个RDD
比如我这里想要把RDD1中包含RDD2的元素挑出来,结果直接报错了
val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10)) val fil_RDD: RDD[Int] = RDD1.filter(i => { RDD2.collect().contains(i) }) fil_RDD.foreach(println)
RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
这说的是RDD的转换以及action没有被driver调度,为什么是这样,需要了解我们写的spark程序,
我这里写的2个RDD在spark中是位于不同分区的那么就是2个task,我在这个task1中加入了又一个task2,task是发送到executor中执行的, 那么已经在executor中的task1,没有办法给task2提供正常的driver端资源调度,以及算子的执行
所以只能是在算子外把另一个RDD转成array,再放进去就没有问题
val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10)) val arr2: Array[Int]=RDD2.collect() val fil_RDD: RDD[Int] = RDD1.filter(i => { arr2.contains(i) }) fil_RDD.foreach(println) //输出:无count
计算RDD里面元素的个数
println(RDD4.count()) //输出:5reduce
传入一个聚合函数,对RDD的全部数据做聚合
println(RDD4.reduce((i, j) => i + j))take
传入一个Int类型的值,从RDD中取多少条数据返回并构建Array
val arr2 = RDD4.take(2) arr2.foreach(println) //1,2fliter
与scala中一样,传入一个函数,返回值是Boolean
val RDD5: RDD[Int] = RDD4.filter(i => { i > 2 }) RDD5.foreach(println) //输出:3,4,5foreachPartition
这是一个行为算子: 需要接收一个函数f:参数类型是Iterator类型,返回值类型为Unit * 会将每个分区的数据传给Iterator并进行最终的处理,一般用于将结果数据保存到外部系统
读取我的本地文件,向数据库中写入数据,我们之前对rdd的 *** 作都是有返回值的,但是现在向数据库写入数据,这是不需要返回值的
import java.sql.{DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ForeachPartitionDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("ForeachPartitionDemo") conf.setMaster("local") val sc = new SparkContext(conf) //设置分区为4,这个可以忽略 val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt",4) val conn = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456") val ps: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") linesRDD.foreach(line=>{ val splits = line.split(",") val id = splits(0).toInt val name = splits(1) val age = splits(2).toInt val gender = splits(3) val clazz = splits(4) ps.setInt(1,id) ps.setString(2,name) ps.setInt(3,age) ps.setString(4,gender) ps.setString(5,clazz) ps.execute() ps.close() conn.close() }) } }
我写入的数据是1000行,写一行数据就连接一次数据库再关闭所以上面,这样的 *** 作是不是不大合适
所以需要通过一些方法减少连接次数,我们上面的mapParttions,这个算子还有返回值,但是我们是往数据库里写数据,不需要什么返回值,这时候就用到了foreachPartition
linesRDD.foreachPartition(iter => { // 连接是不能被序列化的,所以连接的建立需要放入算子内部 // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题 // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接 // 大大降低连接的创建次数 提高性能 val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456") val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") // 这里的foreach方法不是RDD的算子了,这里是Iterator的foreach方法 // 不会出现连接未被序列化的问题,当前处理的分区的数据都会共用一个连接 iter.foreach(line => { val splits: Array[String] = line.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4) st.setInt(1, id) st.setString(2, name) st.setInt(3, age) st.setString(4, gender) st.setString(5, clazz) // st.execute() // 相当于每条数据插入一次 性能也比较低 st.addBatch() }) st.executeBatch() // 采用批量插入的方式 st.close() conn.close() })
最终数据写入了数据库
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)