GROUP BY 场景下的数据倾斜
JOIN 场景下的数据倾斜
1) 由于空值导致的数据倾斜问题
2) 由于数据类型不一致,导致的转换问题,导致的数据倾斜
3) 业务数据本身分布不均,导致的数据倾斜,下面4个小场景
i.大表与小表JOIN (Map JOIN)
ii.大表与大表JOIN, 一张表数据分布均匀,另一张表数据特定的KEY(有限几个) 分布不均
iii.大表与大表JOIN, 一张表数据分布均匀,另一张表大量的KEY 分布不均
iiii.大表与大表JOIN, 桶表,进行表拆分
group by 场景下的其实比较简单,我们只需要在 HIVE 中设置如下两个参数即可 :
set hive.map.aggr=true
set hive.groupby.skewindata=true
我们看下,设置这两个参数为什么能解决 GROUP BY 的数据倾斜问题
set hive.map.aggr=true(默认 : true) 第一个参数表示在 Map 端进行预聚。 因为传到数据量小了,所以效率高了,可以缓解数据倾斜问题。
最主要的参数,其实是 set hive.groupby.skewindata=true
这个参数有什么作用呢。这场来说 GROUP BY 流程只会产生一个MR JOB。但是,设置这个参数为 true 以后, 原来 GROUP BY 的 MR JOB 会由原来的一个变为两个。
流程如下:
JOB1 .第一个作业会进行预处理,将数据进行预聚合,并随机分发到 不同的 Reducer 中。
Map流程 : 会生成两个job来执行group by,第一个job中,各个map是平均读取分片的,在map阶段对这个分片中的数据根据group by 的key进行局部聚合 *** 作,这里就相当于Combiner *** 作。
Shuffle流程:在第一次的job中,map输出的结果随机分区,这样就可以平均分到reduce中
Reduce流程: 在第一次的job中,reduce中按照group by的key进行分组后聚合,这样就在各个reduce中又进行了一次局部的聚合。
JOB2.读取上一个阶段MR的输出作为Map输入,并局部聚合。按照key分区,将数据分发到 Reduce 中,进行统计。
Map流程 : 因为第一个job中分区是随机的,所有reduce结果的数据的key也是随机的,所以第二个job的map读取的数据也是随机的key,所以第二个map中不存在数据倾斜的问题。
在第二个job的map中,也会进行一次局部聚合。
Shuffle流程 : 第二个job中分区是按照group by的key分区的,这个地方就保证了整体的group by没有问题,相同的key分到了同一个reduce中。
Reduce流程 :经过前面几个聚合的局部聚合,这个时候的数据量已经大大减少了,在最后一个reduce里进行最后的整体聚合。
SQL:
SELECT
pt,COUNT(1)
FROM datacube_salary_org
GROUP BY pt
开启前
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: datacube_salary_org
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: pt (type: string)
outputColumnNames: pt
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count(1)
keys: pt (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
开启后:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: datacube_salary_org
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: pt (type: string)
outputColumnNames: pt
Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count(1)
keys: pt (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: rand() (type: double)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: partials
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: final
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
可以明显的看到开启优化后。增加了一层 JOB
hivedistribute
by
和group
by
的区别:
group
by是对检索结果的保留行进行单纯分组,一般总爱和聚合函数一块用例如AVG(),COUNT(),max(),main()等一块用。
distribute
by是控制在map端如何拆分数据给reduce端的。hive会根据distribute
by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort
by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集 *** 作。distribute
by刚好可以做这件事。因此,distribute
by经常和sort
by配合使用。
注:Distribute
by和sort
by的使用场景
1.Map输出的文件大小不均。
2.Reduce输出文件大小不均。
3.小文件过多。
4.文件超大。
抛砖引玉,不足之处还望大神指正~
不可以order by
会对输入做全局排序,因此只有一个
reducer
(多个reducer无法保证全局有序)
只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。
set hive.mapred.mode=nonstrict(default value /
默认值) set hive.mapred.mode=strict order by 和数据库中的Order by 功能一致,按照某一项
&
几项排序输出。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)