Hive常用算子实现原理简述--MapReduce版

Hive常用算子实现原理简述--MapReduce版,第1张

Hive中的常用算子包括distinct、join、group by、order by、distribute by、sort by、count等,这些 *** 作符在SQL中使用起来很方便,能快速达到我们想要的效果,但是这些算子在底层是怎么实现的呢?

order by很容易想到执行原理,在一个reduce中将所有记录按值排序即可。因此order by在数据量大的情况下执行时间非常长,容易out of memory,非特殊业务需求一般不使用。distribute by也比较明显,根据hash值将distribute的值分发到不同的reduce。sort by是小号的order by,只负责将本reducer中的值排序,达到局部有序的效果。sort by和distribute by配合使用风味更佳,二者可以合并简写为cluster by。count则更加明晰,在combiner或reducer处按相同键累加值就能得到。

比较复杂的是distinct、join、group by,本文重点讨论这三个算子在MapReduce引擎中的大致实现原理。班门弄斧,抛砖引玉。

map阶段,将group by后的字段组合作为key,如果group by单字段那么key就一个。将group by之后要进行的聚合 *** 作字段作为值,如要进行count,则value是1;如要sum另一个字段,则value就是该字段。

shuffle阶段,按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。

reduce阶段,将相同key的值累加或作其他需要的聚合 *** 作,得到结果

对group by的过程讲解的比较清楚的是这篇文章 http://www.mamicode.com/info-detail-2292193.html 图文并茂,很生动。

实例如下图,对应语句是 select rank, isonline, count(*) from city group by rank, isonline

如果group by出现数据倾斜,除去替换key为随机数、提前挑出大数量级key值等通用调优方法,适用于group by的特殊方法有以下几种:

(1)set hive.map.aggr=true,即开启map端的combiner,减少传到reducer的数据量,同时需设置参数hive.groupby.mapaggr.checkinterval 规定在 map 端进行聚合 *** 作的条目数目。

(2)设置mapred.reduce.tasks为较大数量,降低每个reducer处理的数据量。

(3)set hive.groupby.skewindata=true,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合 *** 作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合 *** 作。

Hive中有两种join方式:map join和common join

如果不显式指定map side join,或者没有达到触发自动map join的条件,那么会进行reduce端的join,即common join,这种join包含map、shuffle、reduce三个步骤。

(1)Map阶段

读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key。Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。然后按照key进行排序。

(2)Shuffle阶段

根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中

(3)Reduce阶段

根据key的值完成join *** 作,期间通过Tag来识别不同表中的数据。

以下面的SQL为例,可用下图所示过程大致表达其join原理。

SELECT u.name, o.orderid FROM user u JOIN order o ON u.uid = o.uid

关联字段是uid,因此以uid为map阶段的输出key,value为选取的字段name和标记源表的tag。shuffle阶段将相同key的键值对发到一起,reduce阶段将不同源表、同一key值的记录拼接起来,可能存在一对多的情况。

如果指定使用map join的方式,或者join的其中一张表小于某个体积(默认25MB),则会使用map join来执行。具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定。

Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true。

以下图为例说明map join如何执行,该图来自 http://lxw1234.com/archives/2015/06/313.htm ,博主是一个水平深厚又乐于分享的前辈,图片水印上也有其网址。

yarn会启动一个Local Task(在客户端本地执行的Task)--Task A,负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。

接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

distinct一般和group by同时出现。

当distinct一个字段时,将group by的字段和distinct的字段组合在一起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这可以确保相同group by字段的记录都分到同一个reducer,并且map的输入天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到一个不同值,计数器就自增1,即可得到count distinct结果。例如下面的SQL语句,过程可以下图示意。

我暂时没有理解这是怎么实现的,别人写的也没有看明白。有善良的学富五车的大佬指点一下吗?

(二)数据倾斜的解决方案

1、参数调节

hive.map.aggr=true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

有数据倾斜的时候 进行负载均衡 ,当选项设定为true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合 *** 作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中 ,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合 *** 作。

2、SQL语句调节

Join:

关于驱动表的选取:选用join key分布最均匀的表作为驱动表

做好列裁剪和filter *** 作,以达到两表做join的时候,数据量相对变小的效果。

大表与小表Join

使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

大表与大表Join:

把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

count distinct大量相同特殊值:

count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

group by维度过小:

采用sum() 与group by的方式来替换count(distinct)完成计算。

特殊情况特殊处理:

在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

3、应用场景

(一)空值产生的数据倾斜

场景:

如日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1 : user_id为空的不参与关联

解决方法2 :赋与空值新的key值

结论:

方法2比方法1效率更好,不但io少了,而且作业数也少了。

解决方法1中 log读取两次,job是2。

解决方法2中 job数是1 。这个优化适合无效 id (比如 -99 , ”, null 等) 产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。

(二)不同数据类型关联产生数据倾斜

场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join *** 作时,默认的Hash *** 作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方法 :把数字类型转换成字符串类型

(三)小表不小不大,怎么用 map join 解决倾斜问题

使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。

解决如下:

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

解决方法:

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/11300080.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-15
下一篇 2023-05-15

发表评论

登录后才能评论

评论列表(0条)

保存