- 一、一些架构
- 1、Spark 架构
- 二、常用算子
- 1、一些转换算子
- 2、一些执行算子
- 三、接着有的没的
- 1、checkpoint
- 2、cache
- 3、累加器 广播变量
- 四、任务调度
I know, i know
地球另一端有你陪我
一、一些架构
1、Spark 架构
Driver :
1、负责任务的调度,将 task 发送到 Excutor 上执行
2、在 yarn-cluster 模式时兼顾资源申请的功能(被 ApplicationMaster 兼并)
3、算子外部的代码会在 Driver 中执行
4、BlockManagerMaster 负责接收 Executor 中的 BlockManager 的请求并回复
Executor :
1、运行 task 的线程池(task 是一个个线程对象)
2、计算资源(CPU、内存),由 ApplicationMaster 向 ResourceManager 申请得到
3、BlockManager 负责 Executor 中的各种资源管理
(RDD 的缓存、累加器和广播变量的数据、Shuffle 产生的文件)
ConnectionManager :负责创建连接
BlockTransferService :负责获取数据
MemoryStore :负责管理内存的数据(不同于 Hbase 中的那个)
DiskStore :负责管理磁盘的数据
包含了 Scala 中的大部分常见算子,按照返回对象的类型可以分为:
转换算子: 由一个 RDD返回另一个RDD,是RDD之间的转换, 是懒执行的,需要action算子触发执行 行为算子: 由一个RDD调用,但最后没有返回新的RDD,而是返回了其他数据类型, 行为算子可以触发任务的执行,每个action算子都会触发一个job
1、一些转换算子
MapPartitions
package com.shujia import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo4MapPartitions { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo4MapPartitions") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("Spark/data/words.txt") // 对每一个分区的数据进行处理 // 适用于在算子内部需要跟外部数据源建立连接 // (一般创建连接是为了获取数据)的情况 // 通过mapPartitions这种方式可以减少连接创建的次数,提高运行效率 lineRDD.mapPartitions((iter: Iterator[String]) => { println("map partitions") iter.flatMap(line => line.split(",")) }).foreach(println) lineRDD.mapPartitionsWithIndex((index, iter) => { println("当前的分区索引:" + index) iter.flatMap(line => line.split(",")) }).foreach(println) // 对每一条数据进行处理,假设数据有N条 // 如果需要在map中例如去请求MySQL的数据, // 那么会与MySQL建立N次连接 // 会导致运行效率较低,甚至导致MySQL建立的连接数达到上限, // 出现性能问题 lineRDD.map(line => { println("map") line.split(",") }).foreach(println) } }
ForeachPartition
package com.shujia import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} object Demo5ForeachPartition { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo5ForeachPartitions") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students数据 val linesRDD: RDD[String] = sc.textFile("Spark/data/students.txt", 4) println(linesRDD.getNumPartitions) // getNumPartitions不是算子,相当于是RDD的一个属性 // 2、建立JDBC连接 // val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456") // val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") // 3、遍历每一条数据 // 因为不需要返回值,所以选择foreach算子进行遍历 // linesRDD.foreach(line => { // // 连接是不能被序列化的,所以连接的建立需要放入算子内部 // // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题 // val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456") // val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") // // val splits: Array[String] = line.split(",") // val id: Int = splits(0).toInt // val name: String = splits(1) // val age: Int = splits(2).toInt // val gender: String = splits(3) // val clazz: String = splits(4) // st.setInt(1, id) // st.setString(2, name) // st.setInt(3, age) // st.setString(4, gender) // st.setString(5, clazz) // st.execute() // st.close() // conn.close() // }) // 可以适用foreachPartition代替foreach完成对MySQL数据的插入 // 适用于在算子内部需要跟外部数据源建立连接(一般创建连接是为了写入数据)的情况 linesRDD.foreachPartition(iter => { // 连接是不能被序列化的,所以连接的建立需要放入算子内部 // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题 // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接 // 大大降低连接的创建次数 提高性能 val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456") val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") // 这里的foreach方法实际上不是RDD的算子了,这里是Iterator的foreach方法 // 不会出现连接未被序列化的问题,当前处理的分区的数据都会共用一个连接 iter.foreach(line => { val splits: Array[String] = line.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4) st.setInt(1, id) st.setString(2, name) st.setInt(3, age) st.setString(4, gender) st.setString(5, clazz) // st.execute() // 相当于每条数据插入一次 性能也比较低 st.addBatch() }) st.executeBatch() // 采用批量插入的方式 st.close() conn.close() }) } }
groupByKey
(key ,(v1,v2,v3…))
package com.shujia import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo8GroupBy { def main(args: Array[String]): Unit = { // 统计班级人数 // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo8GroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students数据 val linesRDD: RDD[String] = sc.textFile("Spark/data/students.txt") // 2、将数据构建成 (班级,1) val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1)) // 3、按班级分组 val groupRDD: RDD[(String, Iterable[(String, Int)])] = clazzRDD.groupBy(kv => kv._1) groupRDD.foreach(println) // 4、统计班级人数 groupRDD.map { case (clazz: String, clazzIter: Iterable[(String, Int)]) => { clazz + "," + clazzIter.map(_._2).sum } }.foreach(println) clazzRDD .groupByKey() .map(kv => (kv._1, kv._2.sum)) .foreach(println) } }
ReduceByKey
(key ,(聚合函数))
package com.shujia import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo9ReduceByKey { def main(args: Array[String]): Unit = { // 统计班级人数 // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo8GroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students数据 val linesRDD: RDD[String] = sc.textFile("Spark/data/students.txt") // 2、将数据构建成 (班级,1) K-V格式 val clazzRDD: RDD[(String, Int)] = linesRDD.map(line => (line.split(",")(4), 1)) clazzRDD .groupByKey() .map(kv => (kv._1, kv._2.sum)) .foreach(println) clazzRDD.reduceByKey((i: Int, j: Int) => { i + j }).foreach(println) // 简写 clazzRDD.reduceByKey(_ + _).foreach(println) while (true) { } } }
MapValues
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo12MapValues { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo12MapValues") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3))) // 只能作用在K-V格式的RDD上,相当于对values进行遍历 rdd.mapValues(i => i * i).foreach(println) } }
Sort
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo13Sort { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo12MapValues") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/stu/students.txt") // 2、按年龄排序 倒序 stuRDD .sortBy(line => line.split(",")(2), ascending = false) .foreach(println) } }
Join
package com.shujia import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object Demo10Join { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo8GroupBy") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt") val scoRDD: RDD[String] = sc.textFile("Spark/data/score.txt") // 2、将数据变成K-V格式 // Key:关联的字段 id Value:将自身作为value val stuKVRDD: RDD[(String, String)] = stuRDD.map(line => (line.split(",")(0), line.replace(",", "|"))) val scoKVRDD: RDD[(String, String)] = scoRDD.map(line => (line.split(",")(0), line.replace(",", "|"))) // (1500100113,(1500100113|羿思真|24|女|理科三班,1500100113|1000001|96)) // join只能作用在 K-V格式的RDD上,会按照K进行关联 // 返回一个二元组,v是另一个二元组(key,(v1,v2)) // join等同于inner join val joinRDD: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD) joinRDD.map { case (id: String, (stu: String, sco: String)) => // 使用 | 进行切分时需要转义 val stuSplits: Array[String] = stu.split("\|") val name: String = stuSplits(1) val clazz: String = stuSplits(4) val scoSplits: Array[String] = sco.split("\|") val sid: String = scoSplits(1) val score: String = scoSplits(2) s"$id,$name,$clazz,$sid,$score" }.foreach(println) joinRDD.map(kv => { val id: String = kv._1 val stuScoT2: (String, String) = kv._2 val stu: String = stuScoT2._1 val sco: String = stuScoT2._2 val stuSplits: Array[String] = stu.split("\|") val name: String = stuSplits(1) val clazz: String = stuSplits(4) val scoSplits: Array[String] = sco.split("\|") val sid: String = scoSplits(1) val score: String = scoSplits(2) s"$id,$name,$clazz,$sid,$score" }).foreach(println) // stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println) stuKVRDD.join(scoKVRDD).foreach(println) } }
2、一些执行算子
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo14Action { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo12MapValues") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt") // foreach 没有返回值 会触发job // 需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit stuRDD.foreach(println) // count 统计RDD中的数据条数 println(stuRDD.count()) // collect 将RDD中的数据转换成scala中的数组 // 使用时注意数据量的大小 val stuArr: Array[String] = stuRDD.collect() val blackListRDD: RDD[String] = sc.parallelize(List("1500100001", "1500100007", "1500100009")) val blacklistArr: Array[String] = blackListRDD.collect() stuRDD.filter(line => { // 在一个RDD中不能直接使用另一个RDD // blackListRDD.collect().contains(line.split(",")(0)) // 如果遇到了需要RDD中套另一个RDD的情况 // 可以换一种思路去实现 blacklistArr.contains(line.split(",")(0)) }).foreach(println) // reduce val sumAge: Int = stuRDD .map(line => line.split(",")(2).toInt) // 传入一个聚合函数 // select sum(age) from students group by 1 // 全局的聚合(将所有数据作为一个组进行聚合) .reduce((i, j) => i + j) println(sumAge) // saveAsTextFile 将结果保存到文件 // stuRDD.saveAsTextFile("data/stu/newStu.txt") // lookup 作用在K-V格式的RDD上,传入一个Key, // 返回所有与之匹配的Key对应的value val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0))) .lookup("尚孤风") println(ids) // take 传入一个Int类型的值,从RDD中取多少条数据返回并构建Array val stuArr2: Array[String] = stuRDD.take(10) // 这里的foreach不再是RDD的算子,而是Array的方法 stuArr2.foreach(println) } }
partition
改变分区的方法
package core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo20Partition { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("Demo20Partition") conf.setMaster("local") // 设置Spark程序默认的并行度 conf.set("spark.default.parallelism", "4") val sc: SparkContext = new SparkContext(conf) // 读取words单词数据 // 第一个RDD的分区数量由切片数量决定 val lineRDD: RDD[String] = sc.textFile("Spark/data/words") println("lineRDD的分区数量为:" + lineRDD.getNumPartitions) val reParRDD: RDD[String] = lineRDD.repartition(5) println("经过repartition过后的分区数:" + reParRDD.getNumPartitions) val coalesceRDD: RDD[String] = lineRDD.coalesce(10, shuffle = true) println("经过coalesce过后的分区数:" + coalesceRDD.getNumPartitions) val coalesceRDD2: RDD[String] = lineRDD.coalesce(1) println("第二个经过coalesce过后的分区数:" + coalesceRDD2.getNumPartitions) // Word Count // flatMap算子不会产生Shuffle // 子RDD的分区数默认等于父RDD的分区数量 val wordsRDD: RDD[String] = lineRDD.flatMap(line => line.split(",")) println("wordsRDD的分区数量为:" + wordsRDD.getNumPartitions) // map算子也不会产生Shuffle 分区数默认等于父RDD的分区数量 val wordKVRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) println("wordKVRDD的分区数量为:" + wordKVRDD.getNumPartitions) val wordCntRDD: RDD[(String, Int)] = wordKVRDD.reduceByKey(_ + _, 6) println("wordCntRDD的分区数量为:" + wordCntRDD.getNumPartitions) wordCntRDD.foreach(println) } }
三、接着有的没的
1、checkpoint
将 RDD 的数据“缓存”到 HDFS
先会完成一次计算任务,然后再往回回溯到调用了 checkpoint 的 RDD,
将其标记并重新启动一个 job,重头开始计算该标记的 RDD,最后写入 HDFS
一般用于SparkStreaming中的容错
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo17Cache { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo17Cache") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 设置checkpoint保存的目录 sc.setCheckpointDir("Spark/data/stu/checkpoint") // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt") val mapStuRDD: RDD[String] = stuRDD.map(line => { // 默认情况下 每个job会打印1000次 说明每个job都读取了一次文件 println("========student========") line }) // 对多次使用的RDD进行缓存 // 直接调用cache方法即可将数据缓存到内存中 // mapStuRDD.cache() // 如果当内存不够的时候 需要将数据放入Dick磁盘 // 需要使用persist() 方法 传入一个缓存策略级别 // mapStuRDD.persist(StorageLevel.DISK_ONLY) // 将RDD的数据缓存到磁盘 mapStuRDD.cache() // mapStuRDD.persist(StorageLevel.DISK_ONLY) mapStuRDD.checkpoint() // 统计班级人数 val clazzKVRDD: RDD[(String, Int)] = mapStuRDD .map(line => (line.split(",")(4), 1)) val clazzCnt: RDD[(String, Int)] = clazzKVRDD .reduceByKey((i, j) => i + j) clazzCnt .foreach(println) // 统计性别人数 val genderKVRDD: RDD[(String, Int)] = mapStuRDD .map(line => (line.split(",")(3), 1)) val genderCnt: RDD[(String, Int)] = genderKVRDD .reduceByKey(_ + _) genderCnt .foreach(println) // 用完记得释放 mapStuRDD.unpersist() while (true) { } } }
2、cache
不同于 checkpoint 将数据存储于 HDFS,cache 是将数据存储于自定义位置
cache 可以通过 persist 选择储存的不同等级,cache 本身相当于 MEMORY_ONLY
offHeap 堆外内存
deserialized 否序列化,选择 false 即使用序列化,会压缩数据,相对会需要CPU 资源
replication 是否需要副本
用完记得 unpersist 释放资源
3、累加器 广播变量
累加器
由于 Spark 中,算子内部(executor)和算子外部(driver)的运行位置是不同的,
一个在算子外部建立的对象,是不能直接通过网络将原对象传进算子内部,
而是会建立一个原对象的副本(replication)去 executor 供算子使用,类似值传递,
因此在算子中对对象进行的 *** 作是不能改变算子外部的原对象
Spark 针对这种情况提供了累加器 Accumulator,需要由 SparkContext 来定义
但其实功能并不多
import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo18ACC { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo18ACC") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) var i: Int = 1 // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt") // 如果想要在算子内部,对外部的变量进行累加,需要使用累加器 // 在算子外部(Driver端)定义一个累加器 val acc: LongAccumulator = sc.longAccumulator val lb: ListBuffer[String] = ListBuffer[String]() stuRDD.foreach(line => { // i += 1 // lb.append(line) // 算子内部无法直接对算子外部定义的变量进行修改 // 修改的实际上都是副本 // println(i) // println(line) // 在算子内部使用 acc.add(1) }) // println("i的值是 " + i) // 在Driver进行汇总(自动汇总) println(acc.value) } }
广播变量
虽然无法修改到算子外部的表,但是还是可以拿来直接用的,毕竟还有副本
但每次使用,都会按 task 进行 replication 的拷贝,内存的负荷非常大
这里可以使用广播变量
类似小表广播,会将对象传到每一个 executor 上,相较于 task ,executor 少很多
算子内再次调用该对象就会从 executor 中去请求,一样需要由 SparkContext 来定义
import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo19Broadcast { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("Demo18ACC") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/stu/students.txt") // 定义一个List val idsList: List[Int] = List(1500100001, 1500100011, 1500100021, 1500100031) // 如果需要在算子内部使用算子外部的变量,可以将变量进行广播 // 将变量广播到每个Executor中 提高性能 val idsListBro: Broadcast[List[Int]] = sc.broadcast(idsList) // 从stuRDD中过滤出 idsList中保存的学生 // 结果没问题 但这里会出现性能问题 // 算子内部的代码最终会被封装成Task发送到Executor中执行 // 每个Task中会封装idsList的一个副本 // Task最终都是在Executor中执行=>>没有必要每个Task放一个副本, // 可以在每个Executor上放一个副本 // 减少分发Task时网络的开销,提高效率 stuRDD.filter(line => { // 提取学生id val id: Int = line.split(",")(0).toInt // 通过id进行过滤 // 也使用了外部变量 但没有进行修改(不会生效) // idsList.contains(id) // 使用广播变量获取算子外部定义的变量 idsListBro.value.contains(id) }).foreach(println) } }
四、任务调度
流程
1、向 ResourceManager 申请启动 ApplicationMaster
2、ResourceManager 会随机选择一个 NodeManager 来启动 ApplicationMaster
3、AM 会反过来向 RM 申请一批资源(CPU,内存)启动 Executor
4、在 NM 中分配启动 Executor
5、Executor 反向注册给 Dirver(本地机器或 AM 兼并)表示已准备就绪
6、扫描 Application,遇到 action 算子时开始任务调度
7、构建 DAG 有向无环图,将 job 罗列出来。形成 DAGScheduler
8、根据宽依赖,将 job 切分成多个 stage
9、将 stage 按照顺序以 task set 的形式发送给 taskScheduler
10、taskScheduler 将 task 一个个发送到 Executor 中执行,会尽可能发送到数据所在的节点上
重试机制
1、如果 task 执行失败,taskScheduler 会重试该 task 三次
2、如果重试三次依然失败,会回退到 DAGScheduler 重试四次
注:如果是因为shuffle file not found 导致的执行失败,会直接回退到上一个 stage 进行重试
推测执行
如果有一个 task 执行很慢,taskScheduler 将该 task 发送给其他 Executor 执行
以先完成的结果为准
yarn-client 模式
yarn-cluster 模式
一个较大的区别是 Driver 端启动的位置不一样
另一个是日志的打印位置,cluster 不会将日志全部打印在本地(Driver)
可以避免数据的过量传输
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)