大数据@IT·大数据大数据

「Hive进阶篇」万字长文超详述hive企业级优化

2022-07-18  本文已影响0人  大数据阶梯之路

肝了几个晚上,梳理总结了一份万字长文超详述hive企业级优化文章,也整理了一份hive优化总结思维导图hive优化详细PDF文档有需要可关注公众号《大数据阶梯之路》找小编获取,学习和复习都是绝佳,公众号不断分享技术相关文章。话不多说,👇🏻下面就直接开讲吧!
更多精彩好文,首发在微信公众号《大数据阶梯之路》,欢迎关注

一览群山.jpeg

文章字数:13271字
预计阅读需:20分钟

一、问题背景

hive离线数仓开发,一个良好的数据任务,它的运行时长一般是在合理范围内的,当发现报表应用层的指标数据总是产出延迟,排查定位发现是有些任务执行了超10小时这样肯定是不合理的,此时就该想想如何优化ETL任务链路,主要从以下几个角度来考虑问题解决

  1. 从数据任务本身hive逻辑代码出发,即hive逻辑优化,偏理解业务角度
  2. 从集群的资源设置出发,即hive参数调优,偏理解技术角度
  3. 从全局数据链路的任务设置出发,观测是否任务执行调度设置不合理
  4. 从数仓的数据易用性和模型复用性的角度出发,针对某些中间逻辑过程可以复用的就落地中间模型表

附上一份个人梳理总结的思维导图部分截图

hive企业级优化思维导图.png

下面就先分享下常见的hive优化策略吧~ 会附带案例实践帮助理解

hive优化文章大纲

  1. 列裁剪和分区裁剪
  2. 提前数据收敛
  3. 谓词下推(PPD)
  4. 多路输出,减少表读取次数写多个结果表
  5. 合理选择排序
  6. join优化
  7. 合理选择文件存储格式和压缩方式
  8. 解决小文件过多问题
  9. distinct 和 group by
  10. 参数调优
  11. 解决数据倾斜问题

二、hive优化

1. 列裁剪和分区裁剪

裁剪 顾名思义就是不需要的数据不要多查。
尽量减少直接select * from table这种操作,首先可读性不好,根本不知道具体用到哪几个列,其次列选择多了也会增大IO传输;
分区裁剪就是针对分区表切记要加上分区过滤条件,比如表以时间作为分区字段,要加上分区筛选。

2. 提前数据收敛

在子查询中,有些条件能先过滤的尽量放在子查询里先过滤,减少子查询输出的数据量。

-- 原脚本
select
     a.字段a,a.字段b,b.字段a,b.字段b
from 
(
    select 字段a,字段b
    from table_a
    where dt = date_sub(current_date,1)
) a 
left join 
(
    select 字段a,字段b
    from table_b
    where dt = date_sub(current_date,1)
) b 
    on a.字段a = b.字段a
where a.字段b <> ''
and b.字段b <> 'xxx'
;

-- 优化脚本 (数据收敛)
select
     a.字段a,a.字段b,b.字段a,b.字段b
from 
(
    select 字段a,字段b
    from table_a
    where dt = date_sub(current_date,1)
    and 字段b <> ''
) a 
left join 
(
    select 字段a,字段b
    from table_b
    where dt = date_sub(current_date,1)
    and 字段b <> 'xxx'
) b 
    on a.字段a = b.字段a
;

3. 谓词下推(Predicate Pushdown)

谓词下推Predicate Pushdown是什么?简称PPD,指的是在不影响数据结果的情况下,将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据,这样在map执行过滤条件,可以减少map端数据输出,起到了数据收敛的作用,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能
hive默认是开启谓词下推该参数设置的,hive.optimize.ppd=true
所谓下推,即谓词过滤在map端执行;所谓不下推,即谓词过滤在reduce端执行。
关于谓词下推的规则,主要分为join的on条件过滤下推和where条件过滤下推,我整理了一张图方便理解。

image.png

核心判断逻辑:join的on条件过滤不能下推到保留行表中;where条件过滤不能下推到null补充表中。

-- 举例说明:以下脚本 on后面的a表条件过滤没有下推至map端运行而是在reduce端运行,where后面的b表条件过滤则有下推至map端运行
select
     a.字段a,a.字段b,b.字段a,b.字段b
from table_a a
left join table_b b
on a.字段a <> '' -- a表条件过滤
where a.字段b <> 'xxx' -- a表条件过滤
;

谓词下推注意事项:
如果在表达式中含有不确定函数,整个表达式的谓词将不会被下推。例如下面脚本,则整个条件过滤都是在reduce端执行:

select a.*
from a join b 
on a.id = b.id
where a.ds = '2019-10-09' 
and a.create_time = unix_timestamp()
;

因为上面unix_timestamp()是不确定函数,在编译的时候无法得知,所以,整个表达式不会被下推,即ds='2022-07-04'也不会被提前过滤。类似的不确定函数还有rand()函数等。

附上一篇关于谓词下推的案例分析讲解
!上链接:https://cloud.tencent.com/developer/article/1616687

4. 多路输出

当我们有使用一次查询,多次插入的场景时,则可以采用多路输出的写法,减少表的读取次数,起到性能优化的作用。

-- 读取一次源表,同时写入多张目标表
from table_source
insert overwrite table table_a
select *
where dt = date_sub(current_date,1)
and event_name = '事件A'
insert overwrite table table_b
select *
where dt = date_sub(current_date,1)
and event_name = '事件B'
insert oveewrite table table_c
select *
where dt = date_sub(current_date,1)
and event_name = '事件C'
;

多路输出注意事项:

  • 一般情况下,一个sql里面最多支持128路输出,超过了则会报错
  • 在多插往同一张分区表的不同分区时,不允许在一个sql里面多路输出时既包含insert overwrite和insert into,要统一操作

5. 合理选择排序

以下举个排序方式优化案例,取用户信息表(10亿数据量)中年龄排前100的用户信息以下案例实现也体现了一个大数据思想,分而治之,大job拆分小job。

-- 原脚本
select *
from tmp.user_info_table
where dt = '2022-07-04'
order by age -- 全局排序,只走一个reduce
limit 100
;

-- 优化脚本
set mapred.reduce.tasks=50; -- 设置reduce个数为50
select *
from tmp.user_info_table
where dt = '2022-07-04'
distribute by (case when age<20 then 0
        when age >=20 and age <= 40 then 1
        else 2
    end
) -- distribute by主要是为了控制map端输出的数据在reduce端中是如何划分的,防止map端数据随机分配到reduce。这里字段做case when判断是因为用户年龄的零散值会导致分布不均匀,起太多reduce本身也耗时浪费资源
sort by age -- 起多个reduce排序,保证单个reduce结果有序
limit 100 -- 取前100,因为是按照年龄局部排序过,所以前100个也一定是年龄最小的
;

排序选择的小结:

  • order by全局排序,但只有一个reducer执行,数据量大的话容易计算不过来,慎用
  • sort by局部排序,单个reducer内有序,把map端随机分发给reduce端执行,如果是要实现全局排序且走多个reducer的优化需求时,可以在外层嵌套一层,例如:select * from (select * from 表名 sort by 字段名 limit N) order by 字段名 limit N,这样就有2个Job,一个是内层的局部排序,一个是外层的归并全局排序
  • distribute by可以按照指定字段将数据进行hash分发到对应的reducer去执行
  • 当分区字段和排序字段相同时可以使用cluster by来简化distribute by+sort by的写法,但是cluster by排序只能是升序排序,不能指定排序规则是ASC或者DESC

6. join优化

hive在redurce阶段完成的join就是common join,在map阶段完成的join就是map join。

select a.*
from 
(
select 1 as id,'a' as name 
union all 
select 2 as id,'b' as name 
) a 
left semi join 
( 
select 1 as id,'b' as name 
union all 
select 1 as id,'c' as name 
) b 
    on a.id = b.id
    
-- 你猜left semi join结果是?
id  name
1   a
-- 而如果上面的脚本是join呢,结果?
id  name
1   a
1   a

left semi join注意事项:

  • 右表的条件过滤只能写在on后面,不能写在where后面
  • 最终结果只能展示左表的列,右表的列不能展示
  • left semi join与join的差异:主要在于右表有重复数据时,left semi join是遍历到右表一条数据后就跳过,只取一条,而join是一直遍历至右表最后一条数据,这也就是要注意实际数据场景是否有重复和是否要保留
set hive.auto.convert.join = true;
-- 大表小表的阈值设置(默认25M一下认为是小表)
set hive.mapjoin.smalltable.filesize=26214400;
-- 不做优化时的原始hql
select  a.id 
from a left join b
on a.id = b.id

1、空key过滤,过滤空key的数据
关联的过程是相同key对应的数据都会发送到相同的reducer上,如果某些空key过多是会导致内存不够的,从而引发join超时,所以如果不需要这类空key数据的时候,可以先过滤掉这些异常数据。

-- 做空key过滤优化时的hql,利用子查询先处理掉后再关联
select a.id 
from (select * from a where id is not null) a
join b
on a.id = b.id

2、空key转换,转换key的数据进行关联时打散key
当然,有时候空值的数据又不一定是异常数据,还是需要保留的,但是空key过多都分配到一个reducer去了,这样执行起来就算不内存溢出也会发生数据倾斜情况,数据倾斜的话对集群资源的利用率来看的话是极其不利的,我们可以通过把空key虚拟成随机数,但要保证不是同一个空key,从而降低数据倾斜概率,虽然这样在对关联键做处理反而会总体增长执行时间,但却减轻了reducer负担。

-- 做空key转换优化时的hql,利用case when判断加随机数
select a.id 
from a.left join b
on case when a.id is null then concat('hive'+rand()) else a.id end = b.id

7. 合理选择文件存储格式和压缩方式

关于这点,我专门写过一篇文章介绍hive常见的几种存储格式和压缩方式,具体可以去上次我写过的这篇文章看看
!上链接https://mp.weixin.qq.com/s/RndQKF5y9Mto7QfgiiAOvQ

8. 解决小文件过多问题

-- 第①种导入数据方式
insert into table A values();  -- 每执行一条语句hive表就产生一个文件,但这种导入数据方式生产环境少见;
-- 第②种导入数据方式
load data local path '本地文件/本地文件夹 路径' overwrite into table A;  -- 导入文件/文件夹`,即有多少个文件hive表就会产生多少个文件
-- 第③种导入数据方式
insert overwrite table A select * from B;  -- 通过查询的方式导入数据是生产环境最常见的

MR中 reduce 有多少个就输出多少个文件,文件数量 = reduce数量 * 分区数,如果说某些简单job没有reduce阶段只有map阶段,那文件数量 = map数量 * 分区数从公式上看,reduce的个数和分区数最终决定了输出的文件的个数,所以可以调整reduce的个数以及分区 达到控制hive表的文件数量。

1、使用hive自带的 concatenate 命令,来合并小文件
不过要注意的是concatenate命令只支持hive表存储格式是orcfile或者rcfile,还有该方式不支持指定合并后的文件数量

-- 对于非分区表
alter table test_table concatenate;
-- 对于分区表
alter table test_table partition(dt = '2022-07-16') concatenate;

2、调整参数减少Map数

-- 102400000B=102400KB=100M

-- 每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=102400000;
-- 一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;

-- 前3行设置是确定合并文件块的大小,>128M的文件按128M切块,>100M和<128M的文件按100M切块,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;    -- map执行前合并小文件
-- 设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true;
-- 设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true;
-- 设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000;
-- 当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge
set hive.merge.smallfiles.avgsize=16000000; 

3、调整参数减少Reduce数

-- hive中的分区函数 distribute by 正好是控制MR中partition分区的,然后通过设置reduce的数量,结合分区函数让数据均衡的进入每个reduce即可。

-- 直接设置reduce个数
set mapreduce.job.reduces=10;

-- 执行以下语句,将数据均衡的分配到reduce中
set mapreduce.job.reduces=10;
insert overwrite table A partition(dt)
select * from B
distribute by rand();
解释:如设置reduce数量为10,则使用 rand(), 随机生成一个数x % 10,这样数据就会随机进入 reduce 中,防止出现有的文件过大或过小

9. count(distinct ) 和 group by

在计算去重指标的时候,比如不同年龄段的用户数这个指标,一般都是采用count(distinct user_id)直接计算,当表数据量不大的话影响不大,但如果数据量大count distinct就很耗性能了,因为其只会用一个reduce task来执行,容易reduce端数据倾斜,通常优化就使用里层group by age然后再外层count(user_id)来替代。

注意事项:
关于使用里层group by age然后再外层count(user_id)来替代count(distinct user_id)直接去重计算是否一定就起到优化效果这也是看情况的,假设表数据量不是特别大,有些情况下里层group by age然后再外层count(user_id)未必就见得比count(distinct user_id)好。所以还是具体业务场景具体分析为好,优化从来不是考虑局部就好,要全局考虑。

  • hive3.x版本里已经新增了对count(distinct )的优化,通过set hive.optimize.countdistinct配置,即使真的出现数据倾斜也可以自动优化,自动改变SQL执行的逻辑
  • 里层group by age然后再外层count(user_id)这种方式会生成2个job任务,会消耗更多的磁盘网络I/O资源

10. 参数调优

11. 解决数据倾斜问题

1、Map端优化
通常情况下,Job会通过input目录产生一个或多个map任务,map数主要取决与input的文件总个数,文件总大小,集群设置的文件块大小。
从hadoop2.7.3版本开始,HDFS的默认块大小block size是128M。每张hive表在hdfs上对应存储都是一个文件,关于执行task时,每一个128M的文件都是一个块block,每个块就用一个map任务来完成,若文件超过128M就分块,若小于128M则独立成块。
那么:①当小文件过多怎么办?
答案是map任务增多,map任务的启动和初始化时间远大于执行逻辑处理时间,从而集群造成资源浪费。
②是不是让每个文件都接近128M大小就毫无问题了呢?
答案是不可能,假设一个文件大小127M,但表只有一两个字段,文件大小是由几千万条记录撑大的,如果数据处理逻辑复杂则用一个map任务去执行也是很耗时的。
③是不是map数越多越好?
答案是这种说法是片面的,map数增多有利于提升并行度,但一个map在启动和初始化时间是远大于执行逻辑处理时间,越多的map启动初始化就造成很大的集群资源浪费。

减少map数量,降低资源浪费,如何做?
以下相当于是把小文件合并成大文件处理 (多合一)

-- 102400000B=102400KB=100M

-- 每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=102400000;
-- 一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=102400000;
-- 一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=102400000;

-- 前3行设置是确定合并文件块的大小,>128M的文件按128M切块,>100M和<128M的文件按100M切块,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;    -- map执行前合并小文件

有时候对hive进行优化,在执行时间上可能没什么大的改观,但是在计算资源上就有很大改善。

增大map数量,分担每个map处理的数据量提升任务效率,如何做?
以下相当于是把小文件合并成大文件处理 (一拆多)

根据mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))从公式可以看出调整maxSize最大值,让maxSize最大值低于blocksize就可以增加map的个数。

mapreduce.input.fileinputformat.split.minsize(切片最小值),默认值=1,参数调的比blockSize大,则可以让切片变得比blocksize还大,从而减少map数
mapreduce.input.fileinputformat.split.maxsize(切片最大值),默认值=blocksize块大小,参数如果调到比blocksize小,则会让切片变小,从而增大map数

2、Reduce端优化
reduce个数设置过大也会产生很多小文件对namenode有影响,且输出的小文件偶尔也会作为下一个任务的输入导致出现小文件过多问题,设置过小又会导致单个reduce处理的数据量过大导致OOM异常。
不指定时则hive会默认根据计算公式hive.exec.reducers.bytes.per.reducer(每个reduce任务处理数据量,默认1G)和hive.exec.reducers.max(每个任务的最大reduce数,默认1009个),来做min(hive.exec.reducers.max值,总输入数据量/hive.exec.reducers.bytes.per.reducer值)计算,得出结果确定reduce个数,所以可以通过调整参数1和参数2来调整reduce个数,不过最简便的还是通过下面的参数来直接控制reduce个数。

-- 手动指定reduce个数
set mapred.reduce.tasks=50;
-- 设置每一个job中reduce个数
set mapreduce.job.reduces=50;

那么:①reduce数是不是越多越好?
答案是错误的,同map数一样,启动reduce和初始化同样耗时和占资源,而且过多的reduce会生成多个文件,同样会出现小文件问题。
②什么情况下当设置了参数指定reduce个数后还是只有单个reduce在跑?

合理设置map数和reduce数的小结:

  • set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //系统默认格式,设置在map执行前合并小文件,减少map数
  • set mapreduce.input.fileinputformat.split.maxsize = 100; //调整最大切片值,让maxSize值低于blocksize就可以增加map数
  • 根据mapreduce切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))从公式可以看出调整maxSize最大值,让maxSize最大值低于blocksize,从而使切片变小,就可以增加map的个数

三、总结

  1. 日常hive开发中时刻养成提前数据收敛的习惯,避免无用数据参与到计算中
  2. 不要过度进行优化,有可能做的是无用功甚至产生负效应,在调优上投入的工作成本和回报不成正比
  3. 对于公共可复用的逻辑代码,可以抽取出来落地临时表或者中间表,提升复用性,强调复用!
  4. 理解hiveQL底层执行的原理,优化起来才有章可循
  5. 理透需求是代码优化的前提,关注全局数据链路,一些常见的hive优化策略要懂
  6. 做hive优化的时候,涉及到参数调优时要慎重,比如把内存都申请抢占满了,避免因为你自己的任务调优了但影响到整个集群其他任务的资源分配,全局优才是优!
分享就到此结束了,建议收藏吸纳消化,博文不易,欢迎👏🏻点赞+转发+收藏,更多精彩好文,尽在微信公众号《大数据阶梯之路》
上一篇下一篇

猜你喜欢

热点阅读