如何避免Spark SQL做数据导入时产生大量小文件

如何避免Spark SQL做数据导入时产生大量小文件,第1张

生产上,我们往往将Spark SQL作为Hive的替代方案,来获得SQL on Hadoop更出色的性能。因此,本文所讲的是指存储于HDFS中小文件,即指文件的大小远小于HDFS上块(dfs.block.size)大小的文件。

比如我们拿TPCDS测试集中的store_sales进行举例, sql如下所示

首先我们得到其执行计划,如下所示,

store_sales的原生文件包含1616逻辑分片,对应生成1616 个Spark Task,插入动态分区表之后生成1824个数据分区加一个NULL值的分区,每个分区下都有可能生成1616个文件,这种情况下,最终的文件数量极有可能达到2949200。1T的测试集store_sales也就大概300g,这种情况每个文件可能就零点几M。

比如,为了防止Shuffle阶段的数据倾斜我们可以在上面的sql中加上 distribute by rand() ,这样我们的执行计划就变成了,

这种情况下,这样我们的文件数妥妥的就是spark.sql.shuffle.partitions * N,因为rand函数一般会把数据打散的非常均匀。当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

最理想的情况,当然是根据分区字段进行shuffle,在上面的sql中加上 distribute by ss_sold_date_sk 。 把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件,在我们的case中store_sales,在1825个分区下各种生成了一个数据文件。

但是这种情况下也容易出现数据倾斜的问题,比如双11的销售数据就很容易在这种情况下发生倾斜。

前面已经提到根据分区字段进行分区,除非每个分区下本身的数据较少,分区字段选择不合理,那么小文件问题基本上就不存在了,但是也有可能由于shuffle引入新的数据倾斜问题。

我们首先可以尝试是否可以将两者结合使用, 在之前的sql上加上 distribute by ss_sold_date_sk,cast(rand() * 5 as int) , 这个类似于我们处理数据倾斜问题时候给字段加上后缀的形式。如,

按照之前的推算,每个分区下将产生5个文件,同时null值倾斜部分的数据也被打散成五份进行计算,缓解了数据倾斜的问题 ,我们最终将得到1825 *5=9105个文件,如下所示

如果我们将5改得更小,文件数也会越少,但相应的倾斜key的计算时间也会上去。

在我们知道那个分区键倾斜的情况下,我们也可以将入库的SQL拆成几个部分,比如我们store_sales是因为null值倾斜,我们就可以通过 where ss_sold_date_sk is not null 和 where ss_sold_date_sk is null 将原始数据分成两个部分。前者可以基于分区字段进行分区,如 distribute by ss_sold_date_sk 后者可以基于随机值进行分区, distribute by cast(rand() * 5 as int) , 这样可以静态的将null值部分分成五个文件。

对于倾斜部分的数据,我们可以开启Spark SQL的自适应功能, spark.sql.adaptive.enabled=true 来动态调整每个相当于Spark的reduce端task处理的数据量,这样我们就不需要人为的感知随机值的规模了,我们可以直接

然后Spark在Shuffle 阶段会自动的帮我们将数据尽量的合并成 spark.sql.adaptive.shuffle.targetPostShuffleInputSize (默认64m)的大小,以减少输出端写文件线程的总量,最后减少个数。

对于 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 参数而言,我们也可以设置成为 dfs.block.size 的大小,这样可以做到和块对齐,文件大小可以设置的最为合理。

在我们的 猛犸大数据平台 上面,随便的建立几个SQL作业,不用会Spark也可以用SQL把大数据玩得666!

双击每个工作节点,我们也可以对我们的SQL作业进行参数的调整

选中我们对应的实验组,点击执行后,可以查看任务的运行状态。

从各组的实验结果来看

实验组一的小文件控制还是可喜可贺的。对于我们1t的tpcds测试数据,null值分区字段下只有40个文件,其他每个数据分区也只有一个数据文件,总目录1825,总文件数1863. 在解决数据倾斜问题的基础上,也只比纯按照分区字段进行distibute by多了39个文件。

本文讲述的是如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。

数仓面试高频考点:

【在Hive中如何解析小文件过多问题,指定的是:处理表中数据时,有很多小文件】

| Table Parameters: | NULL | NULL |

| | bucketing_version | 2 |

| | numFiles | 1 |

| | numRows| 0 |

| | rawDataSize| 0 |

| | totalSize | 656 |

| | transient_lastDdlTime | 1631525001|

如果没有显示表的统计信息,执行如下命令,再次查看表信息

ANALYZE TABLE db_hive.emp COMPUTE STATISTICS

| Table Parameters: | NULL | NULL|

| | COLUMN_STATS_ACCURATE | {"BASIC_STATS":"true"} |

| | bucketing_version | 2 |

| | numFiles | 1 |

| | numRows| 14 |

| | rawDataSize| 643 |

| | totalSize | 656 |

| | transient_lastDdlTime | 1655113125 |

| | NULL | NULL|

第一种,将小文件合并成一个大文件

第二种,使用SparkContext中提供: wholeTextFiles 方法,专门读取小文件数据。

将每个文件作为一条KV存储在RDD中, K:文件名的绝对路径,V:文件的内容

用于解决小文件的问题,可以将多个小文件变成多个KV,自由指定分区个数

不论是Hive还是Spark SQL在使用过程中都可能会遇到小文件过多的问题。小文件过多最直接的表现是任务执行时间长,查看Spark log会发现大量的数据移动的日志。我们可以查看log中展现的日志信息,去对应的路径下查看文件的大小和个数。

通过上述命令可以查看文件的个数以及大小。count查看出的文件大小单位是B,需要转换为MB。

在spark官方的推荐文档中,parquet格式的文件推荐大小是128MB,小于该大小的均可以称之为小文件,在实际的工作,往往小文件的大小仅仅为几KB,表现为,可能文件大小为几百MB,但是文件个数可能到达了几十万个。一般来说,我们可以通过简单相除获得文件的平均大小,如果文件数目不多,我们也可以通过下述命令获得每个文件的大小。

1.任务执行时间长

2.真实的文件大小独占一个数据存储块,存放到DataNode节点中。同时 DataNode一般默认存三份副本,以保障数据安全。同时该文件所存放的位置也写入到NameNode的内存中,如果有Secondary NameNode高可用节点,也可同时复制一份过去。NameNode的内存数据将会存放到硬盘中,如果HDFS发生重启,将产生较长时间的元数据从硬盘读到内存的过程。

3.不论在Hive还是在Spark中,每一个存储块都对应一个Map程序,一个Map呈现就需要一个JVM,启动一个JVM去读取或者写小文件是吃力不讨好的行为。在实际的生产中,为了更好的管理集群资源,一般会要求程序执行时限制Executor数量和每个Executor的核心数量,需要频繁创建Executor来读取写入。

5.影响磁盘寻址时间

小文件合并,本质上就是通过某种 *** 作,将一系列小文件合并成大文件。我们知道,以MapReduce为代表的大数据系统,都习惯用K-V键值对的形式来处理文件,最后文件落盘,也是一个reduce对应一个输出文件。所以直观上,我们可以减少reduce数量,达到减少文件数量的目的。

从Map到Reduce需要一个Shuffle过程,所以我们将小文件合并理解为通过一个Shuffle,合并小文件成一个大文件。基于这样的思想,我们的策略可以分为两类:一类是原来的计算已经有Shuffle了,那么我们可以认为控制输出文件的数量;二类是强制触发Shuffle,进行小文件合并。

1-设置参数 (一般用于Hive)

2-distribute by rand()

往动态分区插入数据时,在已经写好的SQL末尾加上distribute by rand()

该算子只是起到打散的效果,但是我们还要设置文件的大小,以免打散后仍然有小文件。

表示每个reduce的大小,Hive可以数据总量,得到reduce个数,假设hive认为会有10个reduce,那么,这里rand()则会为 x % 10

3-group by

我们知道,group by算子会触发Shuffle,因此只要我们设置好Shuffle时的文件个数就好,在Spark SQL中,我们可以设置partition个数,因为一个partition会对应一个文件。

上述的 *** 作,会触发shuffle,因此我们再设置partition个数。

则表示,shuffle后,只会产生10个partition.

4-repartition()

5-coalesce()

需要注意的是,4和5都是spark 2.4以及以后才会支持的。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/7985733.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-12
下一篇 2023-04-12

发表评论

登录后才能评论

评论列表(0条)

保存