在开始演示之前,我们先介绍下两个概念。
概念一,数据的可选择性基数,也就是常说的cardinality值。
查询优化器在生成各种执行计划之前,得先从统计信息中取得相关数据,这样才能估算每步 *** 作所涉及到的记录数,而这个相关数据就是cardinality。简单来说,就是每个值在每个字段中的唯一值分布状态。
比如表t1有100行记录,其中一列为f1。f1中唯一值的个数可以是100个,也可以是1个,当然也可以是1到100之间的任何一个数字。这里唯一值越的多少,就是这个列的可选择基数。
那看到这里我们就明白了,为什么要在基数高的字段上建立索引,而基数低的的字段建立索引反而没有全表扫描来的快。当然这个只是一方面,至于更深入的探讨就不在我这篇探讨的范围了。
概念二,关于HINT的使用。
这里我来说下HINT是什么,在什么时候用。
HINT简单来说就是在某些特定的场景下人工协助MySQL优化器的工作,使她生成最优的执行计划。一般来说,优化器的执行计划都是最优化的,不过在某些特定场景下,执行计划可能不是最优化。
比如:表t1经过大量的频繁更新 *** 作,(UPDATE,DELETE,INSERT),cardinality已经很不准确了,这时候刚好执行了一条SQL,那么有可能这条SQL的执行计划就不是最优的。为什么说有可能呢?
来看下具体演示
譬如,以下两条SQL,
A:
select * from t1 where f1 = 20B:
select * from t1 where f1 = 30如果f1的值刚好频繁更新的值为30,并且没有达到MySQL自动更新cardinality值的临界值或者说用户设置了手动更新又或者用户减少了sample page等等,那么对这两条语句来说,可能不准确的就是B了。
这里顺带说下,MySQL提供了自动更新和手动更新表cardinality值的方法,因篇幅有限,需要的可以查阅手册。
那回到正题上,MySQL 8.0 带来了几个HINT,我今天就举个index_merge的例子。
示例表结构:
mysql>desc t1+------------+--------------+------+-----+---------+----------------+| Field | Type | Null | Key | Default | Extra |+------------+--------------+------+-----+---------+----------------+| id | int(11) | NO | PRI | NULL | auto_increment || rank1 | int(11) | YES | MUL | NULL | || rank2 | int(11) | YES | MUL | NULL | || log_time | datetime | YES | MUL | NULL | || prefix_uid | varchar(100) | YES | | NULL | || desc1 | text | YES | | NULL | || rank3 | int(11) | YES | MUL | NULL | |+------------+--------------+------+-----+---------+----------------+7 rows in set (0.00 sec)表记录数:
mysql>select count(*) from t1+----------+| count(*) |+----------+| 32768 |+----------+1 row in set (0.01 sec)这里我们两条经典的SQL:
SQL C:
select * from t1 where rank1 = 1 or rank2 = 2 or rank3 = 2SQL D:
select * from t1 where rank1 =100 and rank2 =100 and rank3 =100表t1实际上在rank1,rank2,rank3三列上分别有一个二级索引。
那我们来看SQL C的查询计划。
显然,没有用到任何索引,扫描的行数为32034,cost为3243.65。
mysql>explain format=json select * from t1 where rank1 =1 or rank2 = 2 or rank3 = 2\G*************************** 1. row ***************************EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "3243.65" }, "table": { "table_name": "t1", "access_type": "ALL", "possible_keys": [ "idx_rank1", "idx_rank2", "idx_rank3" ], "rows_examined_per_scan": 32034, "rows_produced_per_join": 115, "filtered": "0.36", "cost_info": { "read_cost": "3232.07", "eval_cost": "11.58", "prefix_cost": "3243.65", "data_read_per_join": "49K" }, "used_columns": [ "id", "rank1", "rank2", "log_time", "prefix_uid", "desc1", "rank3" ], "attached_condition": "((`ytt`.`t1`.`rank1` = 1) or (`ytt`.`t1`.`rank2` = 2) or (`ytt`.`t1`.`rank3` = 2))" } }}1 row in set, 1 warning (0.00 sec)我们加上hint给相同的查询,再次看看查询计划。
这个时候用到了index_merge,union了三个列。扫描的行数为1103,cost为441.09,明显比之前的快了好几倍。
mysql>explain format=json select /*+ index_merge(t1) */ * from t1 where rank1 =1 or rank2 = 2 or rank3 = 2\G*************************** 1. row ***************************EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "441.09" }, "table": { "table_name": "t1", "access_type": "index_merge", "possible_keys": [ "idx_rank1", "idx_rank2", "idx_rank3" ], "key": "union(idx_rank1,idx_rank2,idx_rank3)", "key_length": "5,5,5", "rows_examined_per_scan": 1103, "rows_produced_per_join": 1103, "filtered": "100.00", "cost_info": { "read_cost": "330.79", "eval_cost": "110.30", "prefix_cost": "441.09", "data_read_per_join": "473K" }, "used_columns": [ "id", "rank1", "rank2", "log_time", "prefix_uid", "desc1", "rank3" ], "attached_condition": "((`ytt`.`t1`.`rank1` = 1) or (`ytt`.`t1`.`rank2` = 2) or (`ytt`.`t1`.`rank3` = 2))" } }}1 row in set, 1 warning (0.00 sec)我们再看下SQL D的计划:
不加HINT,
mysql>explain format=json select * from t1 where rank1 =100 and rank2 =100 and rank3 =100\G*************************** 1. row ***************************EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "534.34" }, "table": { "table_name": "t1", "access_type": "ref", "possible_keys": [ "idx_rank1", "idx_rank2", "idx_rank3" ], "key": "idx_rank1", "used_key_parts": [ "rank1" ], "key_length": "5", "ref": [ "const" ], "rows_examined_per_scan": 555, "rows_produced_per_join": 0, "filtered": "0.07", "cost_info": { "read_cost": "478.84", "eval_cost": "0.04", "prefix_cost": "534.34", "data_read_per_join": "176" }, "used_columns": [ "id", "rank1", "rank2", "log_time", "prefix_uid", "desc1", "rank3" ], "attached_condition": "((`ytt`.`t1`.`rank3` = 100) and (`ytt`.`t1`.`rank2` = 100))" } }}1 row in set, 1 warning (0.00 sec)加了HINT,
mysql>explain format=json select /*+ index_merge(t1)*/ * from t1 where rank1 =100 and rank2 =100 and rank3 =100\G*************************** 1. row ***************************EXPLAIN: { "query_block": { "select_id": 1, "cost_info": { "query_cost": "5.23" }, "table": { "table_name": "t1", "access_type": "index_merge", "possible_keys": [ "idx_rank1", "idx_rank2", "idx_rank3" ], "key": "intersect(idx_rank1,idx_rank2,idx_rank3)", "key_length": "5,5,5", "rows_examined_per_scan": 1, "rows_produced_per_join": 1, "filtered": "100.00", "cost_info": { "read_cost": "5.13", "eval_cost": "0.10", "prefix_cost": "5.23", "data_read_per_join": "440" }, "used_columns": [ "id", "rank1", "rank2", "log_time", "prefix_uid", "desc1", "rank3" ], "attached_condition": "((`ytt`.`t1`.`rank3` = 100) and (`ytt`.`t1`.`rank2` = 100) and (`ytt`.`t1`.`rank1` = 100))" } }}1 row in set, 1 warning (0.00 sec)对比下以上两个,加了HINT的比不加HINT的cost小了100倍。
总结下,就是说表的cardinality值影响这张的查询计划,如果这个值没有正常更新的话,就需要手工加HINT了。相信MySQL未来的版本会带来更多的HINT。
如果面试时被问到spark任务如何调优,我们该如何回答呢?
下面我们从四大方面回答这个问题,保证吊打面试官。
一、spark性能调优
1、分配更多的资源
比如增加执行器个数(num_executor)、增加执行器个数(executor_cores)、增加执行器内存(executor_memory)
2、调节并行度
spark.default.parallelism
3、重构RDD架构以及RDD持久化
尽量去复用RDD,差不多的RDD可以抽取成一个共同的RDD,公共RDD一定要实现持久化
4、广播变量
SparkContext.broadcast方法创建一个对象,通过value方法访问
5、使用kryo序列化
SparkConf中设置属性:spark.serializer: org.apache.spark.serializer.kryoSerializer
6、使用fastutil优化数据格式(代替java中的Array、List、Set、Map)
7、调节数据本地化等待时长
调节参数: spark.locality.wait
二、JVM调优
降低cache *** 作的内存占比 1.6版本之前使用的是静态内存管理
spark中堆内存被划分为两块:
一块是专门来给RDD作cachepersist持久化的 StorageMemory,另一块是给spark算子函数运行使用的,存放函数中自己创建的对象。
1.6版本之后采用统一内存管理机制
storage和execution各占50%,若己方不足对方空余可占用对方空间
可尝试调节executor堆外内存
spark.yarn.executor.memoryOverhead = 2048m
调节连接等待时长
spark.core.connection.ack.wait.timeout = 300
三、shuffle数据倾斜调优
1、预聚合源数据,对hive源表提前进行聚合 *** 作,在hive聚合之后,spark任务再去读取
2、检查倾斜的key是否是脏数据,可以提前过滤
3、提高shuffle *** 作reduce的并行度
4、使用随机key实现双重聚合
5、将reduce端 join转换成map端 join
6、sample采样倾斜key,单独进行join后在union
7、使用随机数以及扩容表进行join
四、算子调优
1、使用mapPartition提升map类 *** 作的性能
2、filter过后使用coalesce减少分区数量
3、使用foreachPartition优化写数据性能
4、使用repartition解决sparkSql低并行度的性能问题
5、reduceByKey替换groupByKey实现map读预聚合
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)