(2)数据倾斜
1、数据倾斜原因
数据倾斜主要是由于在reduce阶段,某些值过多导值相应的reduce处理缓慢导致的。数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算。
倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的mapreduce job 都很容易产生倾斜
常见表现为:
Hive 查询中某个reduce运行缓慢,进度卡在99.99%;伴随着数据倾斜,会出现任务被kill等各种诡异的表现。查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
Spark Streaming 中一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。 因为Spark Streaming程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成OOM。以及Saprk sql
2、count distinct 、join、和group by 容易产生数据倾斜
count distinct、group by、join等操作,这些都会触发Shuffle动作,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单点问题。
在日常应用中,常出现数据缺失(null)和为0等情况,这些数据处理不当也容易产生数据倾斜。
3、解决办法
(1)配置项设置
真实数据中数据倾斜是常见的, hadoop 中默认是使用(通用对)
hive.map.aggr=true #Map 端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
hive.exec.reducers.bytes.per.reducer = 1000000000
这个参数控制一个job会有多少个reducer来处理,依据的是输入文件的总大小。如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM 2、如果reduce太多: 产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。如果我们不指定mapred.reduce.tasks, hive会自动计算需要多少个reducer。上面的参数说明每个节点的reduce 默认是处理1G大小的数据。
hive.exec.reducers.max
这个参数控制最大的reducer的数量,不会影响mapre.reduce.tasks参数的设置。默认的max是999。
mapred.reduce.tasks
这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer。默认是-1
如果你的join 操 作也产生了数据倾斜,那么你可以在hive 中设定
set hive.optimize.skewjoin = true; �
set hive.skewjoin.key = skew_key_threshold (default = 100000)
#这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)
(2)根据实际业务调整
例如:说订单场景吧,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。
我们单独对这两个城市来做count,最后和其它城市做整合。
(3)Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group 再在外面包一层count,就可以了。(数据量小的时候不需要,这种情况只出现在数据量大的情况下)
(4)对于异常数据或者缺失数据,可以先过滤掉,或者用其他(多个不同)值代表;或者对key值使用哈希算法重新映射;或者对于异常数据单独计算。
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
(5)使用mapjoin和left semi join
使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.在判断小表不大于1G的情况下,使用map join
(6)在写SQL时进行列裁剪,去除无关对列,减少内存消耗。
(7)不同数据类型关联产生的倾斜问题。
例如:一张表 s8 的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。 s8 的日志中有 32 位字符串商品 id,也有数值商品 id,日志中类型是 string 的,但商品中的 数值 id 是 bigint 的。猜想问题的原因是把 s8 的商品 id 转成数值 id 做 hash 来分配 Reduce, 所以字符串 id 的 s8 日志,都到一个 Reduce 上了,解决的方法验证了这个猜测。
解决方法:把数据类型转换成字符串类型
4、sum,count,max,min等UDAF,不怕数据倾斜问题
hadoop在map端的汇总合并优化(类似于Combiner),使数据倾斜不成问题。