spark优化

spark优化,第1张

spark优化

有关spark调优可以从三个方面(代码优化、参数优化、数据倾斜优化)进行考虑

目录
  • 代码优化
  • 参数优化(spark提供最佳的计算位置,移动计算,不移动数据)
    • 数据本地化级别
    • 数据本地化级别逐渐降级
    • 对数据本地化级别进行优化
  • JVM、shuffle、executor堆外内存调优
    • JVM
    • shuffle调优
    • executor堆外内存调优
      • 堆外内存
      • 需要调整堆外内存的情况
  • 数据倾斜优化

代码优化
  • 避免创建重复的RDD

  • 尽可能复用同一个RDD
    这一条和上一条差不多,比如读取一份数据,可以从头至尾尽量都使用这份RDD,避免创建重复的RDD

  • 对多次使用的RDD进行持久化
    首先在数据量不大,内存充足的情况下,使用MEMORY级别的策略进行存储,如果纯内存级别无法使用,并且造成了内存溢出,建议使用MEMORY_AND_DISK_SER策略,不建议使用DISK_ONLY和后缀为_2的级别,因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用(使用_2级别,都已经要备份一份数据了,那还不如重新计算重新写进HDFS一份)

  • 尽量避免使用shuffle类算子
    尽量避免分组聚合

  • 使用map-side预聚合的shuffle *** 作
    必须要分组聚合,尽量用reducebykey之类的算子(有combiner端的预聚合(幂等 *** 作才有预聚合))

  • 使用高性能的算子
    使用reduceByKey/aggregateByKey替代groupByKey (map端预聚合,减少shuffle传输的数据量)
    使用mapPartitions替代普通map Transformation算子
    使用foreachPartitions替代foreach Action算子
    使用filter之后进行coalesce *** 作 (过滤之后,分区量减少,不需要这么多分区,用来减少分区数,减少分区不需要shuffle,所以使用coalesce)

    • 这里的repartition和coalesce都可以改变分区,区别在于
      coalesce源码:

      repartition源码:

      repartition默认使用的是coalesce中shuffle = true的形式,默认会产生shuffle,从小分区到大分区肯定会产生shuffle,但从大分区到小分区的时候,可以选用coalesce方法,并将里面的shuffle调整为false,让它不产生shuffle
      repartition:coalesce(numPartitions,true) 增多分区使用这个
      coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类 *** 作代码

  • 广播大变量(主要是做mapjoin(map端的join),适用于大表关联小表的场景,广播小表,sparksql也可以广播变量)

  • 使用Kryo优化序列化性能
    spark使用到序列化的地方:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。 (比如创建一个样例类,后面用到了这个样例类的时候,它会默认自己实现序列化)
    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
      为什么要用Kryo序列化:(默认使用的是Java的序列化,改用Kryo,可以提升性能)
    • Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快 ,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可 以让网络传输的数据变少;在集群中耗费的内存资源大大减少。
    • 对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和 反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同 时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。 官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有 使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类 型,因此对于开发者来说,这种方式比较麻烦
  • 优化数据结构

    • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。

    • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

    • 集合类型,比如HashMap、linkedList等,因为集合类型内部通常会使用一些内部类来 封装集合元素,比如Map.Entry。

    • 因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。

  • 使用高性能的库fastutil

参数优化(spark提供最佳的计算位置,移动计算,不移动数据)

进程数、内存、核数

数据本地化级别

Application任务执行流程:
在Spark Application提交后,Driver会根据action算子划分成一个个的job,然后对每一 个job划分成一个个的stage,stage内部实际上是由一系列并行计算的task组成的,然后 以TaskSet的形式提交给你TaskScheduler,TaskScheduler在进行分配之前都会计算出 每一个task最优计算位置。Spark的task的分配算法优先将task发布到数据所在的节点上 ,从而达到数据最优计算位置。

数据本地化级别:

  • PROCESS_LOCAL 进程本地化(最快,直接从内存取数据完成计算)
    task需要计算的数据都在一个Executor中

  • NODE_LOCAL 节点本地化(spark计算的数据来源于HDFS,最好的数据本地化级别就是NODE_LOCAL)
    task需要计算的数据同一个Worker(standalone模式,yarn模式对应nodemanager,一个nodemanager启动之后可能会启动很多executor,数据有可能会从别的executor中拿取),但不在同一个Executor中

  • NO_PREF(没有最佳位置一说,比如sparksql读取别的数据源的数据(比如MySQL在master,读取和处理数据在别的地方))
    task计算的数据在同一个Worker磁盘上

  • RACK_LOCAL (机架本地化)

  • ANY (跨机架)

数据本地化级别逐渐降级


比如这里有一份数据,在发送到task执行之前,首先会拿到rdd数据所在的位置(block的位置),首先会根据节点发送task,task的数据本地化级别是PROCESS_LOCAL,但是发送到executor中执行之后,发现executor还有一些别的数据需要处理,所以会等待,等待三秒,重试五次,仍然无法执行,就认为这个executor的计算资源满了,就开始降低数据本地化级别到NODE_LOCAL,如果仍然无法执行,就会继续降低数据本地化的级别

对数据本地化级别进行优化

调整等待时间

JVM、shuffle、executor堆外内存调优 JVM

针对executor调优
概述:
Spark task执行算子函数,可能会创建很多对象,这些对象,都是要放入JVM中 ,RDD的缓存数据也会放入到堆内存中

配置:
默认spark.storage.memoryFraction缓存占比 0.6

shuffle调优

概述:
reduceByKey:要把分布在集群各个节点上的数据中的同一个key,对应的values,都给 集中到一个节点的一个executor的一个task中,对集合起来的value执行传入的函数进行 reduce *** 作,最后变成一个value

配置
spark.shuffle.manager, 默认是sort (不管)
spark.shuffle.consolidateFiles,默认是false (不管)
spark.shuffle.file.buffer,默认是32k(shuffle的时候每次拉多少数据,这个可以调整,同一个数量级调整)
spark.shuffle.memoryFraction,默认是0.2 (executor默认是0.6,两个可以来回互相进行修改调整)

executor堆外内存调优

概述:
Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外 内存(netty是零拷贝),所以使用了堆外内存。

堆外内存

默认情况下,这个堆外内存上限默认是每一个executor的内存大小的10%;真正处理大数据的时候, 这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G (1024M),甚至说2G、4G

调节等待时长
executor在进行shuffle write,优先从自己本地关联的BlockManager中获取某份数据如果本地 block manager没有的话,那么会通过TransferService,去远程连接其他节点上executor的block manager去获取,尝试建立远程的网络连接,并且去拉取数据

频繁的让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。处于垃圾回 收过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作, 无法提供响应,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那 么这个task就失败了。

解决:–conf spark.core.connection.ack.wait.timeout=120

需要调整堆外内存的情况

原因:

  • Executor由于内存不足或者对外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数 据
  • Executor并没有挂掉,而是在拉取数据的过程出现了问题
    出错情况:
  • shuffle file cannot find (DAGScheduler,resubmitting task)
  • executor lost
  • task lost
  • out of memory
    解决办法:
  • yarn下:
    –conf spark.yarn.executor.memoryOverhead=2048 单位M
  • standlone下:
    –conf spark.executor.memoryOverhead=2048单位M
数据倾斜优化

key分布不均匀
要产生shuffle

  • 使用hive预处理数据
    (治标不治本,hive底层是mapreduce,会解决最后的结局,一定会出一个结果,但是比较慢)

  • 过滤少数导致倾斜的key(将一些不需要的key过滤掉)

  • 提高shuffle *** 作的并行度

  • 双重聚合
    先对数据打上前缀,比如0-5的前缀,从(hello,1)变成(1_hello,1),进行一个局部的聚合,之后将各个key的前缀去掉,变成(hello,2)这种形式,再进行全局聚合,得到最终的结果

  • 将reduce join转为map join(大表关联小表)
    (广播大变量,将数据广播到map端,因为平时的 *** 作,数据都只有在reduce端才能得到全部的数据,这里将数据广播到map端,在map端得到所有的数据;map join适合大表关联小表,广播小表)

  • 采样倾斜key并拆分join *** 作(双重join,适合大表关联大表)
    这里表数据量大,不适合广播,只适合一个大表中有数据倾斜,另一个数据很均匀规范

当一个表有倾斜的数据的时候,找出这个表里面哪些key是倾斜的再将另一个RDD中的数据(相当于一个小表)广播出去,这地方join一次,之后再聚合,join第二次

  • 使用随机前缀和扩容RDD进行join

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575927.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存