在学习spark之前我们应该已经学习了Scala语法,spark是通过scala语言编写的,对scala语言的支持较好
目录
一、spark的搭建模式
二、 spark中的一些重要名词的功能及内部组成
三、RDD
转换算子(transformation)
行为算子(Action)
四、资源申请和任务调度
五、缓存cache
六、checkpoint
七、累加器和广播变量
八、分区
一、spark的搭建模式
二、 spark中的一些重要名词的功能及内部组成local:一般用于测试代码和学习的
standalone:用的是spark自身的集群,spark自身提供了计算资源,由一个主节点Master和其余的从节点Worker构成提交任务有2种方式,cient(客户端)和cluster(集群)模式,client模式的driver端在本地启动,运行日志也会在本地打印,数据量一大,所有日志拉到一台机器上导致网卡流量激增,机器可能崩溃;cluster模式的日志不会在本地打印,可以通过外部UI查看(在spark的端口:8080,查看到运行日志信息),diver端存在集群中随机的一台机器
mesos:Spark on Mesos模式中,Spark程序所需要的各种资源,都由Mesos负责调度。由于Mesos和Spark存在一定的血缘关系,因此,Spark这个框架在进行设计开发的时候,就充分考虑到了对Mesos的充分支持,因此,相对而言,Spark运行在Mesos上,要比运行在YARN上更加灵活、自然。目前,Spark官方推荐采用这种模式.
yarn:Spark可运行于YARN之上,与Hadoop进行统一部署,即“Spark on YARN”,资源管理和调度依赖YARN,分布式存储则依赖HDF
spark on yarn提交任务有也是2种方式,cient(客户端)和cluster(集群)模式,client模式的driver端在本地启动,运行日志也会在本地打印,;cluster模式的日志不会在本地打印,可以通过命令(yarn logs -applicationId 程序的id)进行查看,diver端存在集群中随机的一台机器
k8s:是一种容器化部署方式,可以让用户对应用的管理更加方便
Driver:spark程序中除了算子内部的代码都是Driver端的内容,负责资源的申请(申请多少内存,核数)和调度(发送task到executor中)
Executor:集群中每一个节点上的一个jvm进程,里面包含的内容非常丰富:
一个线程池(taskpool),用于运行发送到executor中的task
Blockmanager:4样功能,ConnectionManager,BlockTransferService,MenmoryStore,DiskStore
cpu和内存,计算资源和存储资源(Resource Manager分配)
blockmanager主要用于管理以下的三种数据
三、RDDRDD称为d性分布式数据集,是spark中的基本数据处理模型,代码中他是一个抽象类,他是一个d性的,不可变,可分区,里面的元素可并行计算的集合
下面的对RDD特点的一些补充
1.d性:存储d性:内存与磁盘自动切换
容错d性:数据丢失可以自动恢复
计算d性:计算出错可以重试机制
分片d性:根据需要可以重新分片(分区)
2.分布式:数据存储在大数据集群中的不同节点上
3.数据集:RDD不保存数据,他封装了计算逻辑,我的理解:从数据开始源头,经历第一个RDD的处理逻辑后,计算结果传递给下一个RDD,RDD本身没有数据记录。只是在使用的时候会调用里面的计算逻辑
4.数据抽象:RDD是一个抽象类,需要子类实现,比如RDD[Int],RDD[String],RDD[(String, Iterable[String])]
5.不可变:RDD的计算逻辑确定下来就不可再更改,只能重新创建新的RDD封装新逻辑
6.可分区,并行计算
rdd5大特性
对上面内容的一些补充
1.rdd的最开始的分区是由文件的切片数决定的,一个文件至少生成了一个切片,一个切片就对应了一个分区
2.每个分区都会使用RDD里面的计算逻辑,他们的计算逻辑都是相同的,一个分区对应了一个Task
3.RDD直接的依赖关系,最后一个RDD肯定是上一个RDD的基础上通过新的计算逻辑得到的,所以一个一个有向上依赖的关系。这种依赖一个对一个的称为窄依赖,与之对应的有宽依赖:一个父节点可以是多个节点的的依赖,形成一对多的情况。
4.分区器,数据进行分区的基础,保证数据得先是kv类型
5.首选位置:判断计算(task)发送到哪个节点,效果最优,如果计算节点和文件存储节点不在一块,还需要通过网络IO传输,无形中降低了性能。所以移动数据不如移动计算,文件存储在哪个节点,计算任务发到相应节点。
创建rdd的3种方法
1、parallelize:通过scala中的集合创建RDD,一般是Array和List创建的
val RDD1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5)) val RDD2: RDD[(Int, Int)] = sc.parallelize(Array((6,7),(1,2)))
2、 makeRDD,底层还是调用parallelize
val RDD3: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5)) val RDD4: RDD[(Int, Int)] = sc.makeRDD(List((6,7),(1,2)))
3、读取文件创建RDD
val stuRDD=sc.textfile("spark/data/students.txt")
4.常见的算子
转换算子(transformation)由一个算子变成另外一个算子
map:传入一个函数,对RDD中的每一条数据按照这个函数的内容处理,返回值自定义
val listRDD = sc.parallelize(List(1, 2, 3, 4, 5)) listRDD.map(_*2).foreach(println) //输出结果:2,4,6,8,10
flatMap:要求返回值类型是集合类型,会把这个返回的集合打开
val strRDD=sc.makeRDD(List("hello,world","java,scala,spark","sum,moon")) strRDD.filter(string=>string.split(",")).foreach(println) //输出:hello,world,java,scala,spark,sum,moon
filter:传入一个函数,要求返回值类型是boolean类型,true就留下这个数据,false被过滤掉
val strRDD=sc.makeRDD(List("hello,world","java,scala,spark","sum,moon")) strRDD.filter(string=>string.startsWith("h")).foreach(println) //输出: hello,world
groupby:传入一个自定义函数,函数需要指定根据什么进行分组,返回k-v格式的数据
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val groupRDD = wordRDD.groupBy(kv => kv._1) groupRDD.foreach(println)
groupByKey:要求传入的数据是k-v格式,根据key直接为我们分好组,可以看到相比上面的groupby方法,groupByKey方法只是返回了value
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val groupRDD: RDD[(String, Iterable[Int])] = wordRDD.groupByKey() groupRDD.foreach(println)
reduceByKey:只能作用在kv格式的数据上;相比较于groupByKey:性能更高但功能较弱
需要接收一个 聚合函数f,一般是 求加和、最大值、最小值;相当于MR中的combiner,只能使用等幂 *** 作,会在Map端进行预聚合,相当于combine
val arrRDD: RDD[String] = sc.makeRDD(Array("hello","java","java","hello","spark","scala","spark")) val wordRDD = arrRDD.map(word => (word, 1)) val rb_RDD = wordRDD.reduceByKey((i, j) => { i + j }) rb_RDD.foreach(println)
join:我们首先需要准备两个rdd,这2个rdd必须是kv格式,而且key需要相同,key相同的,会把他们的value放入一个元组中作为新的value,key就是连接的key
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.join(rdd1).foreach(println)//输出结果(1,(dd,4)) (2,(bb,5)) (3,(aa,6))
left join/right join:join默认是inner join,有时候可能需要用到left join/right join这种 *** 作
在maysql中,如果id关联上,但是被关联一方的数据为空,是用null填充;在spark中很显然没有这种 *** 作,使用left/right join,被关联方返回的是一个Option类型的数据,如果有数据就是Some类型,没有数据就是None类型
比如我这里写的rdd1里面就没有key是6的,可以看看结果应该是 6,(cc,None)
val rdd: RDD[(Int, String)] = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) val rdd1: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd.leftOuterJoin(rdd1).foreach(println)
如果要取出Some里面的数据,用case模式匹配很好做
union:连接相同格式的RDD
val RDD1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val RDD2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10)) val union_RDD: RDD[Int] = RDD1.union(RDD2) union_RDD.foreach(println) //输出结果:1 2 3 4 5 6 7 8 9 10
不同格式的,你在写的时候就直接报错了
sortBy:指定按照每条数据中具体什么位置进行排序
val RDD3: RDD[(String,Int)] = sc.parallelize(List(("one",3), ("two",2), ("three",1))) RDD3.sortBy(kv=>kv._2).foreach(println)
可以在sortBy里面加参数,ascending = false这样就是降序排序
mapParttions:这个算子的作用是向里面写入一个函数,参数是迭代器,返回值自定义
object mapPartitionDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("mapPartitionDemo") val sc = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("spark/data/") linesRDD.mapPartitions((iter:Iterator[String])=>{ println("**mapPartition**") iter.flatMap(line=>line.split(",")) }).foreach(println) linesRDD.map(lines=>{ println("**map**") lines.split(",") }).foreach(println) while(true){ } } }
打印的数据没什么好说的,结果看起来都一样是把目标文件夹下面的单词都打印出来了
但是**mapPartition**打印了2次,**map**打印了10次
我的data文件夹下是2个文件,都是几十k的小文件,我们读取数据就会产生2个分区
mapPartition算子做了什么事呢?
他是根据分区读取数据,传入一个分区(迭代器)后,再对这个分区的数据进行处理,我用flatmap方法处理这个迭代器
map算子做了什么呢?
他是将每一行的数据读入,然后切分
在将这个打印的标签换成数据库连接的时候就可以感受到2者的差距在哪里
mapPartitionsWithIndex:和上面的方法没啥区别,就是我们可以多查看一个分区号,从0开始计数
linesRDD.mapPartitionsWithIndex((index,iter)=>{ println(index) iter.flatMap(line=>line.split(",")) }).foreach(println)
mapValues:要求传入kv类型数据,仅对value数据进行处理
val mapvRDD = sc.makeRDD(List(("小红",1),("小明",2))) mapvRDD.mapValues(i=>math.pow(i,2)).foreach(println) //输出(小红,1.0) (小明,4.0)
sample:用来抽样,可以传入参数为withReplacement:是否放回,franction:抽样比例(大概是这么多,但也不一定),seed:随机种子(这个不会)
//从数据源抽样,withReplacement参数指的是不放回抽样,franction是抽取比例,但不是说一个返回这么多,大概在这个范围左右 val sampleRDD: RDD[String] = linesRDD.sample(false, 0.02)行为算子(Action)
每个action算子都会产生一个job
foreach:传入一个函数,没有返回值;对rdd中的每一条数据使用这个函数,我们通常用的就是foreach(println)
take:从RDD中取出前n条数据,放进一个数组
val RDD4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val arr1: Array[Int] = RDD4.take(2) arr1.foreach(println) //输出:1,2
collect: 将RDD里面的元素取出放在一个数组中
val RDD4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val arr1: Array[Int] = RDD4.collect() arr1.foreach(println) //输出:1,2,3,4,5
count:数RDD里面有几个元素
val RDD4: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5)) println(RDD4.count()) //输出结果:5
reduce:传入一个聚合函数,对RDD的全部数据做聚合
val RDD4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) println(RDD4.reduce((i, j) => i + j)) //输出结果:15
saveasTextfile:里面传入保存路径
RDD4.saveasTextFile("spark/data/rdd4")
foreachPartition:这是一个行为算子: 需要接收一个函数f:参数类型是Iterator类型,返回值类型为Unit , 会将每个分区的数据传给Iterator并进行最终的处理,一般用于将结果数据保存到外部系统
读取我的本地文件,向数据库中写入数据,我们之前对rdd的 *** 作都是有返回值的,但是现在向数据库写入数据,这是不需要返回值的
import java.sql.{DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ForeachPartitionDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("ForeachPartitionDemo") conf.setMaster("local") val sc = new SparkContext(conf) //设置分区为4,这个可以忽略 val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt",4) val conn = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456") val ps: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") linesRDD.foreach(line=>{ val splits = line.split(",") val id = splits(0).toInt val name = splits(1) val age = splits(2).toInt val gender = splits(3) val clazz = splits(4) ps.setInt(1,id) ps.setString(2,name) ps.setInt(3,age) ps.setString(4,gender) ps.setString(5,clazz) ps.execute() ps.close() conn.close() }) } }
我写入的数据是1000行,写一行数据就连接一次数据库再关闭所以上面,这样的 *** 作是不是不大合适
所以需要通过一些方法减少连接次数,我们上面的mapParttions,这个算子还有返回值,但是我们是往数据库里写数据,不需要什么返回值,这时候就用到了foreachPartition
linesRDD.foreachPartition(iter => { // 连接是不能被序列化的,所以连接的建立需要放入算子内部 // foreach是针对每一条数据处理一次,相当于这里会创建1000次连接 会造成性能问题 // 对每个分区的数据进行处理,相当于每个分区建立一次连接,因为有4个分区,所以只会创建4次连接 // 大大降低连接的创建次数 提高性能 val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8&&&useSSL=false","root","123456") val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)") // 这里的foreach方法不是RDD的算子了,这里是Iterator的foreach方法 // 不会出现连接未被序列化的问题,当前处理的分区的数据都会共用一个连接 iter.foreach(line => { val splits: Array[String] = line.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4) st.setInt(1, id) st.setString(2, name) st.setInt(3, age) st.setString(4, gender) st.setString(5, clazz) // st.execute() // 相当于每条数据插入一次 性能也比较低 st.addBatch() }) st.executeBatch() // 采用批量插入的方式 st.close() conn.close() })
最终数据写入了数据库
四、资源申请和任务调度粗粒度的资源调度:
(1)spark在任务执行之前就将所有的资源申请下来,
(2)task后续启动不需要额外申请资源,启动速度非常快
(3)资源浪费,等到最后一个task执行结束后才会释放资源;
细粒度的资源调度:
(1)mapreduce是每一个task执行的时候单独申请资源,
(2)执行完成之后立即释放资源,资源利用充分
(3)taks启动速度慢
spark为什么比mapreduce计算要快?
1.spark是基于内存的计算
2.spark是粗粒度的资源调度,mapreduce是细粒度的资源调度
3.DAG有向无环图,spark中根据shuffle切分出stage,shuffle中间结果不需要落地,mapreduce的shuffle中间文件需要落地
spark on yarn 资源申请和任务调度总流程:
任务调度的容错机制
五、缓存cache
我们在spark代码中如果多次使用一个RDD可以考虑把他缓存起来,因为RDD并不存储数据,只是封装了计算逻辑,遇到一个action算子后,这个RDD会一直回溯到最开始的RDD一步步算过来,我们缓存还是很有用的
cache,默认是将RDD缓存到execotor的memory中,但是内存资源是非常宝贵的,我们的计算资源也需要使用内存,对于内存不足的可以使用磁盘(disk)存储;但是磁盘存储还不是我们想要的,因为在磁盘中寻找数据也需要不少时间,通常情况是2者结合使用,内存放不下了就用磁盘存储,这样保证了我们能快速获取到RDD,也保证了资源的合理分配
cache
是一个转换算子,不会触发job,需要接收
将数据放Excutor内存中,等于 persist(StorageLevel.MEMORY_ONLY)
persist
可以指定缓存级别:
1、是否放内存
2、是否放磁盘
3、是否放堆外内存
4、是否序列化
压缩
好处: 节省
缺点:序列化和反序列化需要浪费cpu执行时间
5、缓存的选择
1.内存足够:MEMORY_ONLY
2.内存不太够,尽可能多的将数据存储到内存中:MEMORY_AND_DISK_SER (数据放入磁盘中需要寻址时间,SER是序列化的意思)
3.内存不够:DISK_ONLY
6.用完记得释放缓存
对多次使用的RDD可以进行缓存,用完用unpersist方法释放
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CacheDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("CacheDemo") conf.setMaster("local") val sc = new SparkContext(conf) val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt") val stuRDD1: RDD[String] = stuRDD.map(line => { println("+++++++++") line }) //将RDD存储在内存中,指的是executor中的内存 stuRDD1.cache() //将RDD存储再磁盘中,不是默认的存储方法使用persist方法 stuRDD1.persist(StorageLevel.DISK_ONLY) //将RDD存储再内存和磁盘中 stuRDD1.persist(StorageLevel.MEMORY_AND_DISK_SER) stuRDD1.map(line=> (line.split(",")(4),1)) .reduceByKey((i,j)=>i+j) .foreach(println) stuRDD1.map(line=>(line.split(",")(3),1)) .reduceByKey((i,j)=>i+j) .foreach(println) } }
全部的缓存选项如下:
六、checkpoint1.数据备份,将RDD写入hdfs
2.action算子出发job,运行完成后,从最后一个RDD回溯,找到调用了checkpoint的RDD,再开启一个job,重新计算这个RDD,再写入HDFS
优化的地方就是把这个调用checkpoint的RDD的上一个RDD缓存了,就不用再重头计算了
3.checkpoint会切断与前面的RDD的依赖关系
代码模拟:
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CheckPointDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("CacheDemo") conf.setMaster("local") val sc = new SparkContext(conf) //在SparkContext中需要设定好checkpoint的存放位置 sc.setCheckpointDir("spark/data/checkpoint") val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt") //checkpoint的前一个RDD缓存起来 stuRDD.cache() val stuRDD1: RDD[String] = stuRDD.map(line => { println("+++++++++") line }) //直接调用checkpoint方法 stuRDD1.checkpoint() stuRDD1.map(line=> (line.split(",")(4),1)) .reduceByKey((i,j)=>i+j) .foreach(println) } }
可以看见checkpoint文件生成了
七、累加器和广播变量
累加器就是用在算子内部实现数值相加,然后在driver端可以获取到这个结果
通常我们在driver端定义一个常量,虽然在算子内部可以获取这个变量,但是那是对driver端的数据进行复制得到的,你在算子内部对这个变量重新赋值,不会对driver端造成任何的影响,
在driver端定义sc.longAccmulator,算子内部累加,driver端进行汇总
import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object AccDemo { def main(args: Array[String]): Unit = { // 构建Spark上下文环境 val conf: SparkConf = new SparkConf() conf.setAppName("AccDemo") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) var i: Int = 1 // 读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt") println(stuRDD.count())//我这里有1000条数据 //定义累加器 val acc: LongAccumulator = sc.longAccumulator stuRDD.foreach(line => { acc.add(1) }) //最终打印的结果是通过累加器的vaule方法我们获取到了值 println(acc.value)//输出结果1000 } }
广播变量
什么是广播变量?
每个算子内部的代码被封装成task发送到executor中执行,如果在算子内部使用了driver端的变量,每一个task都会复制一个副本放进task中,但是task都是在executor中运行,一个exector中可以运行多个task,没必要复制那么多个副本;所以可以将这个外部变量通过广播变量的方式发送发每个executor上,减少task的占用空间,提升任务的运行效率
广播变量在driver端创建,在executor中使用
广播变量的副本数量等于executor的数量
如果直接将数据封装task中,会产生很多副本,增加网络传输的数据量,降低效率
广播变量不能太大,超过executor的内存,导致OOM
用完记得销毁
比如这个代码,想从stuRDD中筛选出id在driver端定义的列表中人的信息
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD object BroadcastDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("Demo18ACC") conf.setMaster("local") val sc: SparkContext = new SparkContext(conf) // 1、读取students、scores数据 val stuRDD: RDD[String] = sc.textFile("Spark/data/stu/students.txt") // 定义一个List val idsList: List[Int] = List(1500100001, 1500100011, 1500100021, 1500100031) // 如果需要在算子内部使用算子外部的变量,可以将变量进行广播 // 将变量广播到每个Executor中 提高性能 val idsListBro: Broadcast[List[Int]] = sc.broadcast(idsList) // 从stuRDD中过滤出 idsList中保存的学生,这个列表通常我们想直接用了算了,没有经过广播 *** 作 // Task最终都是在Executor中执行=>>没有必要每个Task放一个副本,可以在每个Executor上放一个副本 // 减少分发Task时网络的开销,提高效率 stuRDD.filter(line => { // 提取学生id val id: Int = line.split(",")(0).toInt // 通过id进行过滤 // 也使用了外部变量 但没有进行修改(不会生效) // idsList.contains(id) // 使用广播变量获取算子外部定义的变量 idsListBro.value.contains(id) }).foreach(println) } }八、分区
1.第一个RDD的分区数量通常是与文件切片的数量相同(TextInputFormat进行切片)
2.后续的RDD分区数量与父RDD的数量保持一致
3.shuffle类算子可以手动设置分区数量,读取文件的时候也可以设置一次分区
最终一个分区对于生成一个文件,我们应该按照文件大小除以block块大小得到一个数,这个数不说直接作为分区数,考虑到产生数据膨胀的可能,再给大点就比较合适了
下面的代码有设置分区的方法
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object PartitionDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("PartitionDemo") //这个在conf里面设置并行度,只有下面的代码没有对分区数进行改动才能看出来, // 因为下面一改分区,我也不知道上面到底改没改 conf.set("spark.default.parallelism","4") val sc: SparkContext = new SparkContext(conf) //我原来的分区数是2,minPartition设置为3,分区数就会变成更大的那个 //假设我的分区数原来是20,我现在设置minPartition为10,他还是20,因为原理的分区数更大 val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt",3) val wordsRDD: RDD[String] = stuRDD.flatMap(line => line.split(",")) val wordKVRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) val word_rbRDD: RDD[(String, Int)] = wordKVRDD.reduceByKey((i, j) => {i + j},4) println(word_rbRDD.getNumPartitions) //手动改变RDD的分区数量 val rp_RDD: RDD[(String, Int)] = word_rbRDD.repartition(5) println("repartition后的分区数量:"+rp_RDD.getNumPartitions) val coa_RDD: RDD[(String, Int)] = word_rbRDD.coalesce(10) println("coalesce后的分区数量:"+coa_RDD.getNumPartitions)//4 val coa_RDD1: RDD[(String, Int)] = word_rbRDD.coalesce(3) println("coalesce后的分区数量:"+coa_RDD1.getNumPartitions)//3 val coa_RDD2: RDD[(String, Int)] = word_rbRDD.coalesce(10,true) println("coalesce后的分区数量:"+coa_RDD2.getNumPartitions)//10 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)