两个RDD进行join *** 作(即 rdd1.join(rdd2)) 会导致shuffle ,这是因为join *** 作会对key一致的key-vlaue对进行合并,而** key相同的key-value对不太可能会在同一个partition , 因此很有可能是需要进行经过网络进行shuffle的,而shuffle会产生许多中间数据(小文件)并涉及到网络传输,这些通常比较耗时,Spark中要尽量避免shuffle 。
优化方法: 将小RDD的数据通过broadcast到每个executor中,各大RDD partition分别和小RDD做join *** 作 。
具体是:在driver端将小RDD转换成数组array并broadcast到各executor端,然后再各executor task中对各partion的大RDD的key-value对和小rdd的key-value对进行join;由于 每个executor端都有完整的小RDD ,因此小RDD的各partition 不需要shuffle 到RDD的各partition,小RDD广播到大RDD的各partition后, 各partition分别进行join,最后再执行reduce , 所有分区的join结果汇总到driver端 。
如有错误,敬请指正!
1数据存储及压缩优化
针对hive中表的存储格式通常有textfile和orc,压缩格式一般使用snappy。相比于 textfile格式存储,orc占有更少的存储。因为hive底层使用MR计算架构,数据流是hdfs到磁盘再到hdfs,而且会有很多次IO读写 *** 作,所以使用orc数据格式和snappy压缩策略可以降低IO读写,还能降低网络传输量,这样在一定程度上可以节省存储空间,还能提升hql的执行效率;
2 Hive Job优化
① 调节Jvm参数,重用Jvm;
② 合理设置Map个数;
③ 合理设置Reduce个数;
3 Sql语法优化
① 建表优化 :
1) Hive创建表的时候,可以建分区表,分桶表;
2) Hive创建表的时候,可以指定数据存储格式:TextFile、SequenceFile、RCfile 、ORCfile;
② 查询时优化 :
1) 列裁剪,在查询时只读取需要的列,避免全列扫描,不要使用select * from table;
2) 分区裁剪:在查询时只读取需要分区的数据,避免全表扫描;
3) 开启谓词下推:set hive.optimize.ppd = true,默认是true:
a. 将Sql语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量;
4) 大表join小表:
a. 开启MapJoin:set hive.auto.convert.join=true:
b. MapJoin是将Join双方比较小的那个表直接分发到各个Map进程的内存中,在 Map进程中进行Join *** 作, 这样就不用进行Reduce步骤 ,从而提高了速度( 大表left join小表才有效 ,小表left join大表会失效);
5) 大表join大表:
a. SMB Join :Sort Merge Bucket Join(数据不仅分桶了,而且每个桶数据是排好序了);
b. 开启SMB Join之后,底层是根据两个表join字段进行分桶存储,这样的话,两张表就变为了基于桶之间join关联查询,而不是基于整张表的join,减少了笛卡尔积;
6) 少用in,用left semi join替代in:
a. 原始写法:select a.id, a.name from a where a.id in (select b.id from b);
b. 用join改写:select a.id, a.name from a join b on a.id = b.id;
c. left semi join改写:select a.id, a.name from a left semi join b on a.id = b.id;
7) 用union all代替union,因为union all不需要去重,也不需要排序,效率高于union;
(每天1小题,进步1点点)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)