有关spark调优可以从三个方面(代码优化、参数优化、数据倾斜优化)进行考虑
目录- 代码优化
- 参数优化(spark提供最佳的计算位置,移动计算,不移动数据)
- 数据本地化级别
- 数据本地化级别逐渐降级
- 对数据本地化级别进行优化
- JVM、shuffle、executor堆外内存调优
- JVM
- shuffle调优
- executor堆外内存调优
- 堆外内存
- 需要调整堆外内存的情况
- 数据倾斜优化
-
避免创建重复的RDD
-
尽可能复用同一个RDD
这一条和上一条差不多,比如读取一份数据,可以从头至尾尽量都使用这份RDD,避免创建重复的RDD -
对多次使用的RDD进行持久化
首先在数据量不大,内存充足的情况下,使用MEMORY级别的策略进行存储,如果纯内存级别无法使用,并且造成了内存溢出,建议使用MEMORY_AND_DISK_SER策略,不建议使用DISK_ONLY和后缀为_2的级别,因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用(使用_2级别,都已经要备份一份数据了,那还不如重新计算重新写进HDFS一份) -
尽量避免使用shuffle类算子
尽量避免分组聚合 -
使用map-side预聚合的shuffle *** 作
必须要分组聚合,尽量用reducebykey之类的算子(有combiner端的预聚合(幂等 *** 作才有预聚合)) -
使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey (map端预聚合,减少shuffle传输的数据量)
使用mapPartitions替代普通map Transformation算子
使用foreachPartitions替代foreach Action算子
使用filter之后进行coalesce *** 作 (过滤之后,分区量减少,不需要这么多分区,用来减少分区数,减少分区不需要shuffle,所以使用coalesce)- 这里的repartition和coalesce都可以改变分区,区别在于
coalesce源码:
repartition源码:
repartition默认使用的是coalesce中shuffle = true的形式,默认会产生shuffle,从小分区到大分区肯定会产生shuffle,但从大分区到小分区的时候,可以选用coalesce方法,并将里面的shuffle调整为false,让它不产生shuffle
repartition:coalesce(numPartitions,true) 增多分区使用这个
coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition
- 这里的repartition和coalesce都可以改变分区,区别在于
-
使用repartitionAndSortWithinPartitions替代repartition与sort类 *** 作代码
-
广播大变量(主要是做mapjoin(map端的join),适用于大表关联小表的场景,广播小表,sparksql也可以广播变量)
-
使用Kryo优化序列化性能
spark使用到序列化的地方:- 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
- 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。 (比如创建一个样例类,后面用到了这个样例类的时候,它会默认自己实现序列化)
- 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
为什么要用Kryo序列化:(默认使用的是Java的序列化,改用Kryo,可以提升性能) - Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快 ,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可 以让网络传输的数据变少;在集群中耗费的内存资源大大减少。
- 对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和 反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同 时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。 官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有 使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类 型,因此对于开发者来说,这种方式比较麻烦
-
优化数据结构
-
对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
-
字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
-
集合类型,比如HashMap、linkedList等,因为集合类型内部通常会使用一些内部类来 封装集合元素,比如Map.Entry。
-
因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
-
-
使用高性能的库fastutil
进程数、内存、核数
Application任务执行流程:
在Spark Application提交后,Driver会根据action算子划分成一个个的job,然后对每一 个job划分成一个个的stage,stage内部实际上是由一系列并行计算的task组成的,然后 以TaskSet的形式提交给你TaskScheduler,TaskScheduler在进行分配之前都会计算出 每一个task最优计算位置。Spark的task的分配算法优先将task发布到数据所在的节点上 ,从而达到数据最优计算位置。
数据本地化级别:
-
PROCESS_LOCAL 进程本地化(最快,直接从内存取数据完成计算)
task需要计算的数据都在一个Executor中
-
NODE_LOCAL 节点本地化(spark计算的数据来源于HDFS,最好的数据本地化级别就是NODE_LOCAL)
task需要计算的数据同一个Worker(standalone模式,yarn模式对应nodemanager,一个nodemanager启动之后可能会启动很多executor,数据有可能会从别的executor中拿取),但不在同一个Executor中
-
NO_PREF(没有最佳位置一说,比如sparksql读取别的数据源的数据(比如MySQL在master,读取和处理数据在别的地方))
task计算的数据在同一个Worker磁盘上
-
RACK_LOCAL (机架本地化)
-
ANY (跨机架)
比如这里有一份数据,在发送到task执行之前,首先会拿到rdd数据所在的位置(block的位置),首先会根据节点发送task,task的数据本地化级别是PROCESS_LOCAL,但是发送到executor中执行之后,发现executor还有一些别的数据需要处理,所以会等待,等待三秒,重试五次,仍然无法执行,就认为这个executor的计算资源满了,就开始降低数据本地化级别到NODE_LOCAL,如果仍然无法执行,就会继续降低数据本地化的级别
调整等待时间
JVM、shuffle、executor堆外内存调优 JVM针对executor调优
概述:
Spark task执行算子函数,可能会创建很多对象,这些对象,都是要放入JVM中 ,RDD的缓存数据也会放入到堆内存中
配置:
默认spark.storage.memoryFraction缓存占比 0.6
概述:
reduceByKey:要把分布在集群各个节点上的数据中的同一个key,对应的values,都给 集中到一个节点的一个executor的一个task中,对集合起来的value执行传入的函数进行 reduce *** 作,最后变成一个value
配置
spark.shuffle.manager, 默认是sort (不管)
spark.shuffle.consolidateFiles,默认是false (不管)
spark.shuffle.file.buffer,默认是32k(shuffle的时候每次拉多少数据,这个可以调整,同一个数量级调整)
spark.shuffle.memoryFraction,默认是0.2 (executor默认是0.6,两个可以来回互相进行修改调整)
概述:
Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外 内存(netty是零拷贝),所以使用了堆外内存。
默认情况下,这个堆外内存上限默认是每一个executor的内存大小的10%;真正处理大数据的时候, 这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G (1024M),甚至说2G、4G
调节等待时长
executor在进行shuffle write,优先从自己本地关联的BlockManager中获取某份数据如果本地 block manager没有的话,那么会通过TransferService,去远程连接其他节点上executor的block manager去获取,尝试建立远程的网络连接,并且去拉取数据
频繁的让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。处于垃圾回 收过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作, 无法提供响应,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那 么这个task就失败了。
解决:–conf spark.core.connection.ack.wait.timeout=120
需要调整堆外内存的情况原因:
- Executor由于内存不足或者对外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数 据
- Executor并没有挂掉,而是在拉取数据的过程出现了问题
出错情况: - shuffle file cannot find (DAGScheduler,resubmitting task)
- executor lost
- task lost
- out of memory
解决办法: - yarn下:
–conf spark.yarn.executor.memoryOverhead=2048 单位M - standlone下:
–conf spark.executor.memoryOverhead=2048单位M
key分布不均匀
要产生shuffle
-
使用hive预处理数据
(治标不治本,hive底层是mapreduce,会解决最后的结局,一定会出一个结果,但是比较慢) -
过滤少数导致倾斜的key(将一些不需要的key过滤掉)
-
提高shuffle *** 作的并行度
-
双重聚合
先对数据打上前缀,比如0-5的前缀,从(hello,1)变成(1_hello,1),进行一个局部的聚合,之后将各个key的前缀去掉,变成(hello,2)这种形式,再进行全局聚合,得到最终的结果 -
将reduce join转为map join(大表关联小表)
(广播大变量,将数据广播到map端,因为平时的 *** 作,数据都只有在reduce端才能得到全部的数据,这里将数据广播到map端,在map端得到所有的数据;map join适合大表关联小表,广播小表) -
采样倾斜key并拆分join *** 作(双重join,适合大表关联大表)
这里表数据量大,不适合广播,只适合一个大表中有数据倾斜,另一个数据很均匀规范
当一个表有倾斜的数据的时候,找出这个表里面哪些key是倾斜的再将另一个RDD中的数据(相当于一个小表)广播出去,这地方join一次,之后再聚合,join第二次
- 使用随机前缀和扩容RDD进行join
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)