Hive ETL性能优化
2018-02-09 本文已影响406人
西二旗老司机
一、目标
用更少的资源、更短的时间,完成任务计算。
二、方法论
- 收集数据:explain查看执行计划
-
定位瓶颈:
1)查看执行日志,定位哪个Stage(Job)时间长
2)查看Job日志,定位Map阶段慢还是Reduce阶段慢 -
诊断问题:
1)资源不足pending
2)数据倾斜
3)reduce数太少
4)... - 解决瓶颈:对症下药
三、Hive原理
掌握Hive优化方法,首先需要对Hive将SQL编译为MapReduce的过程深入理解。
1. Hive架构
image.pngimage.png
2. 编译阶段
分为六个阶段:
- Antlr定义语法规则,完成SQL词法、语法解析,将SQL转化为AST Tree(抽象语法树)
- 遍历AST Tree,抽象出QueryBlock(查询的基本组成单元)
- 遍历QueryBlock,翻译为OperatorTree(执行操作树)
- 逻辑层优化器:进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量
- 遍历OperatorTree,翻译为MapReduce任务(物理执行计划)
- 物理层优化器:进行MapReduce任务的优化,生成最终的执行计划
3. 执行计划
执行计划是一个DAG图,每个节点是一个MR Job(Stage ),通过Explain语句可查看执行计划:
image.png
4. MR原理
image.pngword count过程
image.png
image.png
5. Join原理
select u.name, o.orderid from order o join user u on o.uid = u.uid;
6. Group By原理
select rank, isonline, count(*) from city group by rank, isonline;
7. Distinct 原理
select dealid, count(distinct uid) num from order group by dealid;
四、Hive性能优化方法
1. Map阶段优化
常见原因及解决方法:
-
上游小文件过多,启动时等待资源时间长
解决方法:
1)mapred.min.split.size.per.node
节点上split的最小size,该参数决定了多个Data Node上的文件是否需要合并
2)mapred.min.split.size.per.rack
机架上split的最小size,该参数决定多个机架上的文件是否需要合并
3)set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
执行map前进行小文件合并 -
split大小设置不合理,过小 不能充分利用集群资源
解决方法:
1)split大小计算
splitSize = Math.max( mapred.min.split.size, Math.min( mapred.max.split.size, blockSize))
2)set mapred.max.split.size 默认为256000000
3)set mapred.min.split.size 默认为1
举例:
2. Shuffle阶段优化
优化要点:
-
提早过滤数据,减少shuffle数据量
如:在子查询中只选取需要的字段 -
适时使用MapJoin
1)适用场景:大表与小表Join
2)相关参数:
hive.auto.convert.join //default true
hive.mapjoin.smalltable.filesize //default 25000000
3)优点:
减少Reduce阶段
避免数据倾斜
在Map阶段将小表读入内存,顺序扫描大表完成Join
MapJoin分为两个阶段:
- 通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
- MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
3. Reduce阶段优化
优化要点:
-
关注数据倾斜
image.png
1)大小表Join
采用Map Join
2)调整参数
hive.map.aggr = true
hive.groupby.skewindata=true
3)空值倾斜
空值Key转为字符串加随机数Join
eg. select * from a left outer Join b on if(a.user_id is null,
concat('hive',rand()), a.user_id) = b.user_id;
4)倾斜数据单独处理后union -
Reduce资源调整
1)默认reduce个数
num_reduce_tasks = min[${hive.exec.reducers.max}, (${input.size} / $
{ hive.exec.reducers.bytes.per.reducer})]
2)适当加大reduce数
set mapred.reduce.tasks=999
4. HiveSQL整理优化
-
Count distinct优化
image.png - 尽量避免使用transform
- 优先使用分区字段过滤
- 尽量使用并行化(适用于子查询,union all)
set hive.exec.parallel=true; //default false
hive.exec.parallel.thread.number=8; // 可同时执行的job数