HIVE优化场景七--数据倾斜--group by 倾斜

HIVE优化场景七--数据倾斜--group by 倾斜,第1张

HIVE优化场景七--数据倾斜:

GROUP BY 场景下的数据倾斜

JOIN 场景下的数据倾斜

1) 由于空值导致的数据倾斜问题

2) 由于数据类型不一致,导致的转换问题,导致的数据倾斜

3) 业务数据本身分布不均,导致的数据倾斜,下面4个小场景

i.大表与小表JOIN (Map JOIN)

ii.大表与大表JOIN, 一张表数据分布均匀,另一张表数据特定的KEY(有限几个) 分布不均

iii.大表与大表JOIN, 一张表数据分布均匀,另一张表大量的KEY 分布不均

iiii.大表与大表JOIN, 桶表,进行表拆分

group by 场景下的其实比较简单,我们只需要在 HIVE 中设置如下两个参数即可 :

set hive.map.aggr=true

set hive.groupby.skewindata=true

我们看下,设置这两个参数为什么能解决 GROUP BY 的数据倾斜问题

set hive.map.aggr=true(默认 : true)   第一个参数表示在 Map 端进行预聚。 因为传到数据量小了,所以效率高了,可以缓解数据倾斜问题。

最主要的参数,其实是 set hive.groupby.skewindata=true 

这个参数有什么作用呢。这场来说 GROUP BY 流程只会产生一个MR JOB。但是,设置这个参数为 true 以后, 原来 GROUP BY 的 MR JOB 会由原来的一个变为两个。

流程如下:

JOB1 .第一个作业会进行预处理,将数据进行预聚合,并随机分发到 不同的 Reducer 中。

      Map流程 : 会生成两个job来执行group by,第一个job中,各个map是平均读取分片的,在map阶段对这个分片中的数据根据group by 的key进行局部聚合 *** 作,这里就相当于Combiner *** 作。

     Shuffle流程:在第一次的job中,map输出的结果随机分区,这样就可以平均分到reduce中  

     Reduce流程:  在第一次的job中,reduce中按照group by的key进行分组后聚合,这样就在各个reduce中又进行了一次局部的聚合。

JOB2.读取上一个阶段MR的输出作为Map输入,并局部聚合。按照key分区,将数据分发到 Reduce 中,进行统计。

      Map流程 : 因为第一个job中分区是随机的,所有reduce结果的数据的key也是随机的,所以第二个job的map读取的数据也是随机的key,所以第二个map中不存在数据倾斜的问题。

在第二个job的map中,也会进行一次局部聚合。

    Shuffle流程 :  第二个job中分区是按照group by的key分区的,这个地方就保证了整体的group by没有问题,相同的key分到了同一个reduce中。

    Reduce流程 :经过前面几个聚合的局部聚合,这个时候的数据量已经大大减少了,在最后一个reduce里进行最后的整体聚合。

SQL:

SELECT 

 pt,COUNT(1)

FROM datacube_salary_org

GROUP BY pt

开启前

STAGE DEPENDENCIES:                               

  Stage-1 is a root stage                         

  Stage-0 depends on stages: Stage-1             

STAGE PLANS:                                     

  Stage: Stage-1                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            alias: datacube_salary_org           

            Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

            Select Operator                       

              expressions: pt (type: string)     

              outputColumnNames: pt               

              Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

              Group By Operator                   

                aggregations: count(1)           

                keys: pt (type: string)           

                mode: hash                       

                outputColumnNames: _col0, _col1   

                Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                Reduce Output Operator           

                  key expressions: _col0 (type: string)

                  sort order: +                   

                  Map-reduce partition columns: _col0 (type: string)

                  Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                  value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: mergepartial                     

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0                                 

    Fetch Operator                               

      limit: -1                                   

      Processor Tree:                             

        ListSink                                 

开启后:

STAGE DEPENDENCIES:                               

  Stage-1 is a root stage                         

  Stage-2 depends on stages: Stage-1             

  Stage-0 depends on stages: Stage-2             

STAGE PLANS:                                     

  Stage: Stage-1                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            alias: datacube_salary_org           

            Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

            Select Operator                       

              expressions: pt (type: string)     

              outputColumnNames: pt               

              Statistics: Num rows: 7 Data size: 1628 Basic stats: COMPLETE Column stats: COMPLETE

              Group By Operator                   

                aggregations: count(1)           

                keys: pt (type: string)           

                mode: hash                       

                outputColumnNames: _col0, _col1   

                Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                Reduce Output Operator           

                  key expressions: _col0 (type: string)

                  sort order: +                   

                  Map-reduce partition columns: rand() (type: double)

                  Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

                  value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: partials                         

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2                                 

    Map Reduce                                   

      Map Operator Tree:                         

          TableScan                               

            Reduce Output Operator               

              key expressions: _col0 (type: string)

              sort order: +                       

              Map-reduce partition columns: _col0 (type: string)

              Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

              value expressions: _col1 (type: bigint)

      Reduce Operator Tree:                       

        Group By Operator                         

          aggregations: count(VALUE._col0)       

          keys: KEY._col0 (type: string)         

          mode: final                             

          outputColumnNames: _col0, _col1         

          Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

          File Output Operator                   

            compressed: false                     

            Statistics: Num rows: 3 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE

            table:                               

                input format: org.apache.hadoop.mapred.SequenceFileInputFormat

                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0                                 

    Fetch Operator                               

      limit: -1                                   

      Processor Tree:                             

        ListSink                                 

可以明显的看到开启优化后。增加了一层 JOB 

hive

distribute

by

和group

by

的区别:

group

by是对检索结果的保留行进行单纯分组,一般总爱和聚合函数一块用例如AVG(),COUNT(),max(),main()等一块用。

distribute

by是控制在map端如何拆分数据给reduce端的。hive会根据distribute

by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort

by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集 *** 作。distribute

by刚好可以做这件事。因此,distribute

by经常和sort

by配合使用。

注:Distribute

by和sort

by的使用场景

1.Map输出的文件大小不均。

2.Reduce输出文件大小不均。

3.小文件过多。

4.文件超大。

抛砖引玉,不足之处还望大神指正~

不可以

order by

会对输入做全局排序,因此只有一个

reducer

(多个reducer无法保证全局有序)

只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

set hive.mapred.mode=nonstrict(default value /

默认值) set hive.mapred.mode=strict order by 和数据库中的Order by 功能一致,按照某一项

&

几项排序输出。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9975921.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存