Spark中sortByKey是如何进行全局排序的

Spark中sortByKey是如何进行全局排序的,第1张

首先有一个问题

有一台服务器:24core 128G内存,要处理一个1T的数据怎么办?

要采用拆分策略,将1T的数据拆分成128G大小的块进入服务器计算。

1T数据拆分成了8个块P1-P8

而且要使P1的数据全部小于P2 P2数据全部小于P3以此类推

这就是分而治之的思想

在sortByKey之前将数据使用partitioner根据数据范围来分区,使得p1所有数据小于p2,p2所有数据小于p3。然后利用sortByKey算子对每一个partition进行分区,这样全局的数据就被排序了

Spark 0.8及以前 Hash Based Shuffle

Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

Spark 0.9 引入ExternalAppendOnlyMap

Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

Spark 1.4 引入Tungsten-Sort Based Shuffle

Spark 1.6 Tungsten-sort并入Sort Based Shuffle

Spark 2.0 Hash Based Shuffle退出历史舞台

Spark支持Hash Shuffle和Sort Shuffle,早期版本使用Hash Shuffle(包括优化后的Hash Shuffle)。Spark1.2起默认使用Sort Shuffle,并且Sort Shuffle在map端有三种实现,分别是UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter,根据运行时信息自动选择对应的实现。

补充说明:上面SortShuffleWriter中提到的Partition,不是RDD中的Partition,而是类似Spark Shuffle之Hash Shuffle中的bucket,如果没有单独说明,Sort Shuffle相关文章中的Partition均为bucket,和源码中的变量名保持一致。

其中Serializer支持relocation

上面提到UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation是指,Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到,具体参考Introduce internal Serializer API for determining if serializers support object relocation #5924。支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,可以通过参数spark.serializer设置。

其中Sort Shuffle设置: 上述三种ShuffleWriter实现均由SortShuffleManager管理

一、Spark中的Sorted-Based Shuffle产出的结果是并没有排序的,也就是说Shuffle的Reduce阶段是没有进行排序 *** 作的,这点和MR不一样。

二、Spark中的Sorted-Based Shuffle只是中间结果排序,也就是说Shuffle的Mapper阶段在将bucket缓存Spill到磁盘的时候进行了排序 *** 作,生成了FileSegment,其中涉及到一个排序算法TimSort。合并FileSegment的为一个文件的同时,生成一个索引文件。

三、排序 *** 作相当于合并相同的Key,聚合数据在一起,便于后续Reducer阶段读取相应的数据。

Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每个 ShuffleMapTask 的输出,排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不同的内容,它还需要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。

Shuffle Map Task 在ExternalSorter 溢出到磁盘的时候,产生一组 File#(File Group是hashShuffle中的概念,理解为一个file文件池,这里为区分,使用File的概念,FileSegment根据PartionID排序) 和 一个索引文件,File 里的 FileSegement 会进行排序,在 Reducer 端有4个Reducer Task,下游的 Task 可以很容易跟据索引 (index) 定位到这个 Fie 中的哪部份 FileSegement 是属于下游的,它相当于一个指针,下游的 Task 要向 Driver 去碓定文件在那里,然后到了这个 File 文件所在的地方,实际上会跟 BlockManager 进行沟通,BlockManager 首先会读一个 Index 文件,根据它的命名则进行解析,比如说下一个阶段的第一个 Task,一般就是抓取第一个 Segment,这是一个指针定位的过程。

补充说明:Sort-Based Shuffle 最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容划分成不同 FileSegment 构成的单一文件 File,另外一个是索引文件 Index。

重要提示:在Sorted-Shuffle中会排序吗?Sort-Based Shuffle的Mapper端在 Sort and Spill 的过程中会排序 *** 作,而且是Spill到磁盘的时候再进行排序的。但在Reducer阶段的ApependOnlyMap过程不进行排序的。

Spark早期版本采用的是AppendOnlyMap来实现shuffle reduce阶段数据的聚合,当数据量不大时没什么问题,但当数据量很大时就会占用大量内存,最后可能OOM。所以从spark 0.9开始就引入了ExternalAppendOnlyMap来代替AppendOnlyMap。

之前做过一年的spark研发,之前在阿里与腾讯也做了很久的hive,所以对这方面比较了解。

第一:其实快多少除了跟spark与hive本身的技术实现外,也跟机器性能,底层 *** 作系统的参数优化息息相关,不能一概而论。

第二:hive 目前应该还是业界的主流,毕竟快与慢很多时候并非是至关重要的,对于一个生产系统来说,更重要的应该是稳定性,spark毕竟还算是比较新兴的事务,快确实快,但是稳定性上距离hive相差甚远。关于spark我们也修复了很多关于内存泄露的BUG,因为您问的是性能,所以不过多介绍(可以跟我要YDB编程指南,里面有我对这些BUG的修正)

第三:关于性能,我测试的可能不够全面,只能在排序与检索过滤上提供我之前的基于YDB的BLOCK sort测试报告供您参考(百度上贴word太费劲,您可以跟我要 word文档)。

排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个“刚需”,无论大数据采用的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的。

有着计算奥运会之称的Sort Benchmark全球排序每年都会举行一次,每年巨头都会在排序上进行巨大的投入,可见排序速度的高低有多么重要!但是对于大多数企业来说,动辄上亿的硬件投入,实在划不来、甚至远远超出了企业的项目预算。相比大数据领域的暴力排序有没有一种更廉价的实现方式?

在这里,我们为大家介绍一种新的廉价排序方法,我们称为blockSort。

500G的数据300亿条数据,只使用4台 16核,32G内存,千兆网卡的虚拟机即可实现 2~15秒的 排序 (可以全表排序,也可以与任意筛选条件筛选后排序)。

一、基本的思想是这样的,如下图所示:

1.将数据按照大小预先划分好,如划分成 大、中、小三个块(block)。

2.如果想找最大的数据,那么只需要在最大的那个块里去找就可以了。

3.这个快还是有层级结构的,如果每个块内的数据量很多,可以到下面的子快内进行继续查找,可以分多个层进行排序。

4.采用这种方法,一个亿万亿级别的数据(如long类型),最坏最坏的极端情况也就进行2048次文件seek就可以筛选到结果。

怎么样,原理是不是非常简单,这样数据量即使特别多,那么排序与查找的次数是固定的。

二、这个是我们之前基于spark做的性能测试,供大家参考

在排序上,YDB具有绝对优势,无论是全表,还是基于任意条件组合过滤,基本秒杀Spark任何格式。

测试结果(时间单位为秒)

三、当然除了排序上,我们的其他性能也是远远高于spark,这块大家也可以了解一下

1、与Spark txt在检索上的性能对比测试。

注释:备忘。下图的这块,其实没什么特别的,只不过由于YDB本身索引的特性,不想spark那样暴力,才会导致在扫描上的性能远高于spark,性能高百倍不足为奇。

下图为ydb相对于spark txt提升的倍数

2、这些是与 Parquet 格式对比(单位为秒)

3、与ORACLE性能对比

跟传统数据库的对比,已经没啥意义,Oracle不适合大数据,任意一个大数据工具都远超oracle 性能。

4.稽查布控场景性能测试

四、YDB是怎么样让spark加速的?

基于Hadoop分布式架构下的实时的、多维的、交互式的查询、统计、分析引擎,具有万亿数据规模下的秒级性能表现,并具备企业级的稳定可靠表现。

YDB是一个细粒度的索引,精确粒度的索引。数据即时导入,索引即时生成,通过索引高效定位到相关数据。YDB与Spark深度集成,Spark对YDB检索结果集直接分析计算,同样场景让Spark性能加快百倍。

五、哪些用户适合使用YDB?

1.传统关系型数据,已经无法容纳更多的数据,查询效率严重受到影响的用户。

2.目前在使用SOLR、ES做全文检索,觉得solr与ES提供的分析功能太少,无法完成复杂的业务逻辑,或者数据量变多后SOLR与ES变得不稳定,在掉片与均衡中不断恶性循环,不能自动恢复服务,运维人员需经常半夜起来重启集群的情况。

3.基于对海量数据的分析,但是苦于现有的离线计算平台的速度和响应时间无满足业务要求的用户。

4.需要对用户画像行为类数据做多维定向分析的用户。

5.需要对大量的UGC(User Generate Content)数据进行检索的用户。

6.当你需要在大数据集上面进行快速的,交互式的查询时。

7.当你需要进行数据分析,而不只是简单的键值对存储时。

8.当你想要分析实时产生的数据时。

ps: 说了一大堆,说白了最适合的还是踪迹分析因为数据量大,数据还要求实时,查询还要求快。这才是关键。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/8095444.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-13
下一篇 2023-04-13

发表评论

登录后才能评论

评论列表(0条)

保存