- 1.spark oom 调优
- 1.1 原因分析
- 1.2 driver内存不足原因与解决:
- 1、读取数据太大。
- 2.数据回传
- 3.spark框架本身的消耗
- 1.1 executor 内存不足
- 2.算子优化
- 3.参数优化
- 4.Spark shuffle优化
内存溢出oom原因两点:
1.Driver内存不足
2.executor内存不足
读取数据太大,在driver端生成了大对象,比如创建了一个大的集合数据结构。
解决方法:
- 1.把大对象从driver端加载变成从executor端加载,例如使用sc.textFile/sc.hadoopFile等
- 2.若无法避免,只能自我评估大对象的内存,增加driver内存。.–driver-memory MEM
数据回传,executor收集数据后回传到driver。例如collect。在spark中规定某个stage中从executor返回的所有数据量不能超过spark.driver.maxResultSize.。
这个参数默认1g,可以请求增加这个参数,但是这个result size是序列后的size,如果是collect,会将数据反序列收集,真正需要的内存会膨胀2-10倍。
解决方法:
-
1.不建议将大数据回传到driver端。建议将回传的 *** 作改成executor端rdd *** 作
-
2.如若无法避免, 自我评collect需要的内存, 相应增加driver-memory的值
spark master会读取每个task的 event log日志去生成spark ui界面。当你运行的task特别多的时候,就会oom
解决方法:
-
1.考虑stage的partition num数目的大小,例如从hdfs加载数据,一般partition num 是自动计算,但是当你用了filter
等 *** 作后,数据量就大大减少了,这时候应该用coalesce(shuffle=false)缩小partition num的大小, -
2.设置参数减少Ui保存的job数和stage总数,spark.ui.retainedJobs(默认1000),spark.ui.retainedStages(默认1000)
-
3.实在没法避免, 相应增加内存.–driver-memory MEM
1.Executor 内存不够有个通用的解决办法就是增加 Executor 内存 --executor-memory MEM
2.map端产生大量的对象,task内存不足,但是没有增加task内存
解决方法:
- 减少task内存消耗,减少每一个task要处理的数据量。就是要增加task数目,每一个partition对应一个task,所以要在产生大量对象的map之前增加加分区,可以使用repartition。
3.shuffle后内存溢出。
shuffle内存溢出一般是单个文件过大。
解决方法:
- 提高任务并行度。或者代码层面增加partitions的数量
4.standalone的资源分配不均匀导致内存溢出
standalone模式下如果配置了–toal-executor-cores和–executor-memory两个参数,但是没有配置–executor-cores就会导致每个executor的memory是一样的,但是cores不同。那么core多的executor就能同时执行多个task,导致内存溢出、
解决放大:同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
5. 在RDD中,共用对象能够减少OOM的情况:
这个比较特殊,这里说记录一下,遇到过一种情况,类似这样rdd.flatMap(x=>for(i <- 1 to 1000) yield (“key”,“value”))导致OOM,但是在同样的情况下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield “key”+“value”)就不会有OOM的问题,这是因为每次(“key”,“value”)都产生一个Tuple对象,而"key"+“value”,不管多少个,都只有一个对象,指向常量池。
如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够有效的减少内存使用.
- 1.使用mapPartitions代替大部分map *** 作,或者连续使用的map *** 作:整个分区的 *** 作,减少了中间结果的输出,避免了频繁的创建了对象。
- 2.Dataframe 代替 RDD任务被划分成多个 stage,在每个 stage 内部,RDD 是无法自动优化的,而 Dataframe 使用 sql 查询,自带 sql 优化器,可自动找到最优方案
- 3.broadcast join和普通join: 在大数据分布式系统中,大量数据的移动对性能的影响也是巨大的。基于这个思想,在两个RDD进行join *** 作的时候,如果其中一个RDD相对小很多,可以将小的RDD进行collect *** 作然后设置为broadcast变量,这样做之后,另一个RDD就可以使用map *** 作进行join,这样能够有效的减少相对大很多的那个RDD的数据移动。
- 4.先filter再join:这个就是谓词下推,这个很显然,filter之后再join,shuffle的数据量会减少,这里提一点是spark-sql的优化器已经对这部分有优化了,不需要用户显示的 *** 作,个人实现rdd的计算的时候需要注意这个。
(谓词下推就是指将各个条件先应用到对应的数据上,而不是根据写入的顺序执行,这样就可以先过滤掉部分数据,降低join等一系列 *** 作的数据量级,提高运算速度,代码用filter。sql用where)
- 5.partitonBy优化:如果一个RDD需要多次在join(特别是迭代)中使用,那么事先使用partitionBy对RDD进行分区,可以减少大量的shuffle.
- 6.combineByKey的使用:因为combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的。诸如
groupByKey,reduceByKey等等 - 7.在内存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。 - 8.在spark使用hbase的时候,spark和hbase搭建在同一个集群:
在spark结合hbase的使用中,spark和hbase最好搭建在同一个集群上上,或者spark的集群节点能够覆盖hbase的所有节点。hbase中的数据存储在HFile中,通常单个HFile都会比较大,另外Spark在读取Hbase的数据的时候,不是按照一个HFile对应一个RDD的分区,而是一个region对应一个RDD分区。所以在Spark读取Hbase的数据时,通常单个RDD都会比较大,如果不是搭建在同一个集群,数据移动会耗费很多的时间。
3.参数优化- spark.driver.memory (default 1g):
这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。 - spark.rdd.compress (default false):
这个参数在内存吃紧的时候,又需要persist数据有良好的性能,就可以设置这个参数为true,这样在使用persist(StorageLevel.MEMORY_ONLY_SER)的时候,就能够压缩内存中的rdd数据。减少内存消耗,就是在使用的时候会占用CPU的解压时间。
这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上 - spark.serializer (default org.apache.spark.serializer.JavaSerializer ):
建议设置为 org.apache.spark.serializer.KryoSerializer,因为KryoSerializer比JavaSerializer快,但是有可能会有些Object会序列化失败,这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册,这个时候要配置spark.kryo.registrator参数或者使用参照如下代码:
val conf=newSparkConf().setMaster(…).setAppName(…)
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
valsc =newSparkContext(conf)
1.spark.shuffle.file.buffer
- 默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
2.spark.reducer.maxSizeInFlight
- 默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
3.spark.shuffle.io.maxRetries - 默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle *** 作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
4.spark.shuffle.io.retryWait
- 默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle *** 作的稳定性。
5.spark.shuffle.memoryFraction
- spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合 *** 作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化 *** 作,建议调高这个比例,给shuffle read的聚合 *** 作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
6.spark.shuffle.consolidateFiles
- 默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle
write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)