Spark task的输入太大如何分割?

Spark task的输入太大如何分割?,第1张

上周调优一个job 的时候发现一个join 意外的耗时间,如图上一个join的shuffle *** 作就耗时1.2h. Input 才91GB, shuffle write 525.5GB. 但是花了1.2h.

看看里面的task , Median 就是42min, max 1.2h . 虽然明显是有拖尾的现象,但是Median就42min ,不是skew 造成的。 Median 的输入是203M,Shuffle write 1171MB, Shuffle Spill(Memory) 14.6G, Shuffle Spill(Disk) 2.2GB.看上去是executor.memory 不够才造成的shuffle的spill 耗时间。

 解决思路:1. 把单个input 变小 2. 调大executor memory . 这次用第一个没有用第二个的原因是罩升这个application 有很多query,如果调大executor memory, 那么其他query 事实上不需要那么大的memory也会跟着一起用较大的executor memory setting.

首先尝试了spark.sql.files.maxPartitionBytes=33554432 , 把单个partition最大读数据量控制在32M。但是执行时间依然是这样,从history server 上看每个task 的输入数据还是200MB ,没有变化。

重新去看源码,源码:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L570

每个task接受的input 的确可以通过spark.sql.files.maxPartitionBytes 来控制,但是对于文件格式是有要求的,fsRelation.fileFormat.isSplitable为true才能根据参数分割输入,isSplittable的源码是这样:输入是否能分割和文件格式text,parquet,orc,json没有关系,只和文件格式对应的压缩算法(下面的codec)有关系。

常见的压缩算法是GZIP,LZO,SNAPPY, GZIP是不能分割的,也就是说一个文件(gz结尾)1GB,怎么设置参数,每个task的输入还是1GB.

看到这里可以写个spark单元测试来验证一下这个spark.sql.files.maxPartitionBytes,先设置成2 , 这个岁袭单元测试的用意是看在读数据的时候,spark是否把每个task的输入控制在2byte. 首先在/tmp/seq 下写了一份数据。看了下的确产生了12个partition. 然后读数据的时候看到底用了多少partitions. 看了下最后用了147个分片来读数据。说明这个参数是有效果的。

重新写个单元测试,写数据的时候用GZIP 压缩下, 看看读数据的时候能否还是用147个partition去读,跑完你就可以发现原来写的时候写了12个partition,现在读文件还是12个partition. 这个时候这个参数根本没有用

回头再看最前面的那个问题,原来之所以参数设置了不起效果,就是因为原来的表的格式是SequenceFile且它的压缩格式是GZIP。解决方法重新生成表让它不带压缩的格式或者你选个可以分割的压缩算法就能让后面读数据的时候spark.sql.files.maxPartitionBytes 生效了。来看看效果 Median 34s 即使task的个数从原来的459个变成了3655个,每个task 的输入从原来的200M变成了32M, 减小了6倍,task个数变成了原来的7倍(的确没有压缩的话表的确是在空间上是变大了一点)。 但是每个task的执行时间从原来的42min变成了31s. job 的执行时间肯定是大乎闷兄幅度下降( job 跑得快快的)

总结一下整个文章想表达的意思

1.  task 输入太大会对shuffle造成spill 到disk 的额外耗时(当然你的executor memory 如果足够大是不会出现这个问题的),请注意这个case 的输入才200M,也不大,但是对shuffle 也造成要spill到disk 的压力。原因就是输入是使用了压缩算法(GZIP), 200M的输入放到内存可以预计扩大到3~5倍(见下面每个压缩算法的压缩率),也就是600M~1G.

2. 可以通过设置spark.sql.files.maxPartitionBytes 来分割每个task 的输入。 但是配合不同的压缩算法,压缩算法是否可以被分割又决定了输入是否可以被分割。

3. 如何看你的输入是使用了什么分割算法,看分片文件的后缀。下面这个分片是GZIP格式。

part-00000-76a8013e-8a5e-4c7d-8ae6-09368920561b-c000.txt.gz。

4. 总结一下各个压缩算法的压缩率和是否可以被分割,从下图可以看到 GZIP的压缩率的确是最高的,但是GZIP是不可以分割的。

lzo 是压谨销缩文件。

一般 Linux 下面的压缩都是流压缩,也就是只能祥知游压缩一个文件。这种猛渣文件是没办法看内容的,只能直接解压缩。

图形界面双击即可。

Hadoop中的文件格式大致上分为面向行和面向列两类:

面向行:TextFile、SequenceFile、MapFile、Avro Datafile

二进制格式文件大小比文本文件大。

生产环境常隐枣用,作为原始表的存储格式,会占用更多磁盘资源,对它的 解析开销一般会比二进制格式高 几十倍以上。

Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。它与Hadoop API中的MapFile 是互相册碰兼容的。

MapFile即为排序后的SequeneceFile,它会额外生成一个索引文件提供按键的查找。文件不支持复写 *** 作,不能向已存在的SequenceFile(MapFile)追加存储记录,在执行文件写 *** 作的时候,该文件是不可读取的。

Avro是一种用于支持数据密集型的二进制文件格州携谈式。它的文件格式更为紧凑,若要读取大量数据时,Avro能够提供更好的序列化和反序列化性能。并且Avro数据文件天生是带Schema定义的,所以它不需要开发者在API 级别实现自己的Writable对象。最近多个Hadoop 子项目都支持Avro 数据格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。

面向列:Parquet 、RCFile、ORCFile

RCFile是Hive推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在IO上跳过这些列。

ORCFile (Optimized Record Columnar File)提供了一种比RCFile更加高效的文件格式。其内部将数据划分为默认大小为250M的Stripe。每个Stripe包括索引、数据和Footer。索引存储每一列的最大最小值,以及列中每一行的位置。

Parquet 是一种支持嵌套结构的列式存储格式。Parquet 的存储模型主要由行组(Row Group)、列块(Column Chuck)、页(Page)组成。

1、行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。

2、列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。

3、页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。

一般原始表数据使用文本格式存储,其他的都是列式存储。

目前在Hadoop中常用的几种压缩格式:lzo,gzip,snappy,bzip2,主要特性对比如下:

其性能对比如下:

2.1 lzo

hadoop中最流行的压缩格式,压缩/解压速度也比较快,合理的压缩率,支持split。适用于较大文本的处理。

对于lzo压缩,常用的有LzoCodec和lzopCodec,可以对sequenceFile和TextFile进行压缩。对TextFile压缩后,mapred对压缩后的文件默认是不能够进行split *** 作,需要对该lzo压缩文件进行index *** 作,生成lzo.index文件,map *** 作才可以进行split。如果设置LzoCodec,那么就生成.lzo后缀的文件,可以用LzoIndexer 进行支持split的index计算,如果设置LzopCodec,那么生成.lzo_deflate后缀的文件,不支持建立index。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/8180589.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-14
下一篇 2023-04-14

发表评论

登录后才能评论

评论列表(0条)

保存