需求场景:
A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单、总金额是多少,为了专注于本节要解决的问题,N只取90天,汇总值仅取成交单数。A表的字段有: buyer_id、seller_id 和pay_cnt_90d。
B表为卖家基本信息表,其中包含卖家的一个分层评级信息,比如把卖家分为6个级别: S0、S1、S2、S3、S4 和S5.
要获得的结果是每个买家在各个级别卖家的成交比例信息,比如:
某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S4:10%;S5:10%;
正常的需求SQL 如下:
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0 ,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1 ,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2 ,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4 ,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a join ( select seller_id,s_level from table_B )b on a.seller_id = b.seller_id )m group by m.buyer_id
但是此SQL 会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分卖家90天内的买家数目并不多,join table_A 和 table_B 的时候ODPS 会按照 seller_id 进行分发,table_A 的大卖家引起了数据倾斜。
但是本数据倾斜问题无法用mapjoin table_B 解决,因为卖家有超过千万条、文件大小有几个GB 超过了 mapjoin 表最大1GB的限制。
先对B表进行过滤,过滤的方式是 B表 join A表,筛选最近 90天内 发生交易的卖家。此方案在一些情况下可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内卖家不多,但还是有一些的,过滤后的B表仍然很大。
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0 ,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1 ,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2 ,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4 ,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a join ( select b0.seller_id,b0.s_level from table_B join (select seller_id from table_A group by seller_id) a0 on b0.seller_id=a0.seller_id )b on a.seller_id = b.seller_id )m group by m.buyer_id方案2: join 时用 case when 语句
此种解决方案应用场景为: 倾斜的值是明确的而且数量很少,比如 null 值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值 concat 随机数,从而达到随机分发的目的,核心逻辑如下:
select a.user_id,a.order_id,b.user_id from table_a a join table_b b on (case when a.user_id is null then concat('hive',rand()) else a.user_id end ) = b.user_id
Hive 已对此进行了优化,只需要设置参数 skewinfo 和skewjoin 参数,不需要修改SQL 代码。
例如,由于table_B的值 "0" 和 "1" 引起了倾斜,值需要做出如下设置:
SET hive.optimize.skewinfo=table:(seller_id)[("0")("1")];
SET hive.optimize.skewinfo=true;
但是方案2也无法解决问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。
方案3: 倍数B表,再取模 join
1、 通用方案
此种方案的思路是建立一个numbers表,其值只有一列 int行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模 join。
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0 ,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1 ,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2 ,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4 ,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a join ( select seller_id,s_level,member from table_B join members )b on a.seller_id = b.seller_id and mod(a.pay_cnt_90d,10)+1 = b.member )m group by m.buyer_id
此思路的核心在于:既然按照 seller id 分发会倾斜,那么再人工增加一列进行分发,
这样之前倾斜的值的倾斜程度会减为原来的 1/10 可以通过配置 nubmers 表修改放大倍数
来降低倾斜程度,但这样做的一个弊端是B表也会膨胀N倍。
2.专用方案
通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可
需要首先知道大卖家的名单,即先建立一个临时表动态存放每日最新的大卖家(比如 dim_big_seller),同时此表的大卖家
要膨胀预先设定的倍数(比如 1000倍)。
select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0 ,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1 ,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2 ,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4 ,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d, if(big.seller_id is not null,concat(table_A.seller_id,'rnd',cast(rand()*1000 as bigint),table_A.seller_id) as seller_id_joinkey from table_A left outer join --big 表seller有重复,请注意一定要group by 后再 join,保证table_A的行数保持不变 (select seller_id from dim_big_seller group by seller_id) big on table_A.seller_id=big.seller_id ) a join ( select seller_id,s_level, --big 表的 seller_id_joinkey生成逻辑和上面的生成逻辑一样 coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey from table_B left outer join --table_B join 大卖家表后该表放大1000倍,其他卖家行数保持不变 (select seller_id,seller_id_joinkey from dim_big_seller) big on table_B,seller_id=big.seller_id )b on a.seller_id_joinkey=b.seller_id_joinkey )m group by m.buyer_id
相比通用方案,专用方案的运行效率明细好了很多,因为只是将B表中大卖的行数放大了 1000 倍,其他卖家的行数保持不变,
但同时也可以看到代码复杂了很多,而且必须首先建立大卖家表。
方案4: 动态一分为二
实际上方案2和3都用到了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案就是动态一分为二,
即对倾斜的键值分开处理,不倾斜的正常join即可,倾斜的把他们找出来然后做mapjoin,最后union all其结果即可。
但是此种解决方案比较麻烦,代码会变得复杂而且需要一个临时表存放倾斜的键值。
采用此解决方案的伪代码如下所示:
-- 由于数据倾斜,先找出近90天买家超过10000的卖家 insert overwrite table tmp_table_B select m.seller_id, n.s_level, from( select seller_id from( select seller_id, count(buyer_id) as byr_cnt from table_A group by seller_id ) a where a.byr_cnt>10000 )m left outer join ( select user_id, s_level from table_B )n on m.seller_id=n.user_id; -- 对于 90天买家数超过 10000 的卖家直接mapjoin,对于其他卖家正常 join 即可 select m.buyer_id ,sum(pay_cnt_90d) as pay_cnt_90d ,sum(case when m.s_level=0 then pay_cnt_90d end ) as pay_cnt_90d_s0 ,sum(case when m.s_level=1 then pay_cnt_90d end ) as pay_cnt_90d_s1 ,sum(case when m.s_level=2 then pay_cnt_90d end ) as pay_cnt_90d_s2 ,sum(case when m.s_level=3 then pay_cnt_90d end ) as pay_cnt_90d_s3 ,sum(case when m.s_level=4 then pay_cnt_90d end ) as pay_cnt_90d_s4 ,sum(case when m.s_level=5 then pay_cnt_90d end ) as pay_cnt_90d_s5 from ( select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A ) a join ( select seller_id,a.s_level from table_A a left outer join tmp_table_B b on a.user_id=b.seller_id where b.seller_id is null )b on a.seller_id=b.seller_id union all select a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d from ( select buyer_id,seller_id,pay_cnt_90d from table_A )a join ( select seller_id,s_level from table_B )b on a.seller_id=b.seller_id )m group by m.buyer_id )m group by m.buyer_id
总结起来,方案1、2 以及方案3中的通用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定的使用场景。
方案3的专用方案和方案4是本节推荐的优化方案,但是它们都需要新建一个临时表来存放每日动态变化的大卖家。
相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是维度表,不然统计结果会是错误的。方案4的解决方案最通用,自由度最高,但是对代码的更改也最大,甚至需要更改代码框架,
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)