Hive Sql 的优化案例

Hive Sql 的优化案例,第1张

Hive Sql 的优化案例

需求场景:

A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单、总金额是多少,为了专注于本节要解决的问题,N只取90天,汇总值仅取成交单数。A表的字段有: buyer_id、seller_id 和pay_cnt_90d。
B表为卖家基本信息表,其中包含卖家的一个分层评级信息,比如把卖家分为6个级别: S0、S1、S2、S3、S4 和S5.

要获得的结果是每个买家在各个级别卖家的成交比例信息,比如:
某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S4:10%;S5:10%;

正常的需求SQL 如下:

select
	m.buyer_id
	,sum(pay_cnt_90d) as pay_cnt_90d
	,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0
	,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1
	,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2
	,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3
	,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4
	,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5
	
from
(
	select 
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select buyer_id,seller_id,pay_cnt_90d
		from table_A
	) a
	join
	(
		select seller_id,s_level
		from table_B
	)b
	on a.seller_id = b.seller_id
)m
group by m.buyer_id

但是此SQL 会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分卖家90天内的买家数目并不多,join table_A 和 table_B 的时候ODPS 会按照 seller_id 进行分发,table_A 的大卖家引起了数据倾斜。

但是本数据倾斜问题无法用mapjoin table_B 解决,因为卖家有超过千万条、文件大小有几个GB 超过了 mapjoin 表最大1GB的限制。
 

优化方式一: 转化为 mapjoin

                  先对B表进行过滤,过滤的方式是 B表 join A表,筛选最近 90天内 发生交易的卖家。此方案在一些情况下可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内卖家不多,但还是有一些的,过滤后的B表仍然很大。

select
	m.buyer_id
	,sum(pay_cnt_90d) as pay_cnt_90d
	,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0
	,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1
	,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2
	,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3
	,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4
	,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5
	
from
(
	select  
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select buyer_id,seller_id,pay_cnt_90d
		from table_A
	) a
	join
	(
		select b0.seller_id,b0.s_level
		from table_B
		join
		(select seller_id from table_A group by seller_id) a0
		on b0.seller_id=a0.seller_id
	)b
	on a.seller_id = b.seller_id
)m
group by m.buyer_id
方案2: join 时用 case when 语句

           此种解决方案应用场景为: 倾斜的值是明确的而且数量很少,比如 null 值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值 concat 随机数,从而达到随机分发的目的,核心逻辑如下:

select 
	a.user_id,a.order_id,b.user_id
from table_a a
join table_b b
on (case when a.user_id is null then concat('hive',rand()) else a.user_id end ) = b.user_id

Hive 已对此进行了优化,只需要设置参数 skewinfo 和skewjoin 参数,不需要修改SQL 代码。
例如,由于table_B的值 "0" 和 "1" 引起了倾斜,值需要做出如下设置:
SET hive.optimize.skewinfo=table:(seller_id)[("0")("1")];
SET hive.optimize.skewinfo=true;

但是方案2也无法解决问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。


方案3: 倍数B表,再取模 join

1、 通用方案
此种方案的思路是建立一个numbers表,其值只有一列 int行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模 join。
 

select
	m.buyer_id
	,sum(pay_cnt_90d) as pay_cnt_90d
	,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0
	,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1
	,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2
	,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3
	,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4
	,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5
	
from
(
	select   
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select buyer_id,seller_id,pay_cnt_90d
		from table_A
	) a
	join
	(
		select 
		seller_id,s_level,member
		from table_B
		join
		members
	)b
	on a.seller_id = b.seller_id
	     and mod(a.pay_cnt_90d,10)+1 = b.member
)m
group by m.buyer_id

       此思路的核心在于:既然按照 seller id 分发会倾斜,那么再人工增加一列进行分发,
这样之前倾斜的值的倾斜程度会减为原来的 1/10 可以通过配置 nubmers 表修改放大倍数
来降低倾斜程度,但这样做的一个弊端是B表也会膨胀N倍。


2.专用方案
    通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可
    
    需要首先知道大卖家的名单,即先建立一个临时表动态存放每日最新的大卖家(比如 dim_big_seller),同时此表的大卖家
    要膨胀预先设定的倍数(比如 1000倍)。


select
	m.buyer_id
	,sum(pay_cnt_90d) as pay_cnt_90d
	,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0
	,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1
	,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2
	,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3
	,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4
	,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5
	
from
(
	select   
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select 
			buyer_id,seller_id,pay_cnt_90d,
			if(big.seller_id is not null,concat(table_A.seller_id,'rnd',cast(rand()*1000 as bigint),table_A.seller_id) as seller_id_joinkey
			from table_A
			left outer join
				--big 表seller有重复,请注意一定要group by 后再 join,保证table_A的行数保持不变
				(select seller_id from dim_big_seller group by seller_id) big
			on table_A.seller_id=big.seller_id
			) a
			join
			(
				select 
				seller_id,s_level,
				--big 表的 seller_id_joinkey生成逻辑和上面的生成逻辑一样
				coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey
				from table_B
				left outer join
					--table_B join 大卖家表后该表放大1000倍,其他卖家行数保持不变
					(select seller_id,seller_id_joinkey from dim_big_seller) big
				on table_B,seller_id=big.seller_id
			)b
				on a.seller_id_joinkey=b.seller_id_joinkey
			)m
group by m.buyer_id

相比通用方案,专用方案的运行效率明细好了很多,因为只是将B表中大卖的行数放大了 1000 倍,其他卖家的行数保持不变,
但同时也可以看到代码复杂了很多,而且必须首先建立大卖家表。


方案4: 动态一分为二


    实际上方案2和3都用到了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案就是动态一分为二,
    即对倾斜的键值分开处理,不倾斜的正常join即可,倾斜的把他们找出来然后做mapjoin,最后union all其结果即可。
    但是此种解决方案比较麻烦,代码会变得复杂而且需要一个临时表存放倾斜的键值。
    采用此解决方案的伪代码如下所示:

	-- 由于数据倾斜,先找出近90天买家超过10000的卖家
	insert overwrite table tmp_table_B
	select 
		m.seller_id,
		n.s_level,
	from(
		select
			seller_id
		from(
			select 
				seller_id,
				count(buyer_id) as byr_cnt
			from	
				table_A
			group by
				seller_id
		) a
		where
			a.byr_cnt>10000
	)m
	left outer join (
		select
			user_id,
			s_level
		from 
			table_B
	)n
	on m.seller_id=n.user_id;
	
	
	-- 对于 90天买家数超过 10000 的卖家直接mapjoin,对于其他卖家正常 join 即可
	

select
	m.buyer_id
	,sum(pay_cnt_90d) as pay_cnt_90d
	,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0
	,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1
	,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2
	,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3
	,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4
	,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5

from
(

	select 
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select buyer_id,seller_id,pay_cnt_90d
		from table_A
	) a
	join
	(
		select seller_id,a.s_level
		from table_A a
		left outer join tmp_table_B b
		on a.user_id=b.seller_id
		where b.seller_id is null
	)b
	on a.seller_id=b.seller_id


	union all
	
	select 
		a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
	from
	(
		select buyer_id,seller_id,pay_cnt_90d
		from table_A
	)a
	join
	(
		select seller_id,s_level
		from table_B
	)b
	on a.seller_id=b.seller_id
)m group by m.buyer_id
)m
group by m.buyer_id


总结起来,方案1、2 以及方案3中的通用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定的使用场景。
    方案3的专用方案和方案4是本节推荐的优化方案,但是它们都需要新建一个临时表来存放每日动态变化的大卖家。
    相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是维度表,不然统计结果会是错误的。方案4的解决方案最通用,自由度最高,但是对代码的更改也最大,甚至需要更改代码框架,
 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4667007.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存