Spark计算中的数据倾斜
本文的讨论场景限定在spark计算引擎,但是并不局限于spark,相关的讨论可以迁移到其他的计算引擎
Spark计算中什么是数据倾斜?
所有的数据倾斜,以task粒度来说,就是单个task的数据条数,相比较其他task的数据条数大很多倍,具体我们可以通过task summary看出来,max的时候,读取的数据特别多
数据倾斜.png除了一种极端情况,数据条数差异不大,但是由于单个字段大小的不同,可能出现数据容量差异很大,可能一个task中1000条数据是100k,在另一个task中1000条数据是100m,这里不讨论这种情况
那么为什么会有这种情况出现呢?我们分2种场景讨论:
1. 初始直接从存储层读数据
一般来讲,这一层不会出现读数据倾斜,因为使用hash分片来读数据,hash可以保证每个分区读取数据条数的稳定
但是如果是带有谓词下推的读取数据的话,会出现数据倾斜,这种情况无可厚非,而且不需要特别解决,因为后续的数据处理过程中,数据的再次shffule会磨平这种情况,所以不需要特别关注
2. 任务计算中
任务计算过程中,由于涉及到shuffle,导致数据会重分区,一些场景下容易造成数据倾斜
2.1 group by
场景
group by
的时候,如果使用的key有明显的倾斜倾向就会出现数据倾斜
例如某款应用的用户信息的详情表有如下字段:
-- user_tb
city_id: string //用户所在城市id,例如深圳,广州
user_id: bigint // 用户id
select city_id, count(1)
from user_tb
group by city_id
执行以上SQL的时候,spark会以city的hash做分区,可以想见如果某个分区里面有北京这样的城市的话,分区里面的数据会特别多
这种情况下的数据倾斜可以添加随机前缀+二次聚合来处理
2.2 join 场景
2.2.1 双表
双表join涉及到3个stage,以下面是sql为例
select user_tb.user_id, city_info.city_name
from user_tb
left join city_info
on user_tb.city_id = city_info.id
2个stage是分别读取user_tb
和city_info
的阶段,经历2个读文件的阶段,这个时候不会有数据倾斜,但是读完文件之后,由于下一个步骤是shuffle,所以2个表都要进行shuffle write,并且以city_id的hash来重分区,这时候就类似上面group by
的场景了,会出现某些分区特别大,真正第3个stage来join数据的时候就可以看出差异了,具体可以参看上面数据倾斜的截图
在这种情况下,由于city_info
比较小,可以采用mapjoin
的方式,避免掉shuffle过程,直接一个stage就处理数据了
2.2.2 三表
为什么单拿出三表join的场景呢, 因为大多数情况下的时候可以将其看作2次双表join来处理,但是有些特殊的情况下,第二次的双表join的时候,数据的情况会有些变化,导致不好排查
以下面的sql为例:
select coalesce(user_df.id, user_di.id) id, other_user.info
from user_df --用户全量表
full join user_di --用户增量表
on user_df.id = user_di.id
left join other_user --其他用户信息
on user_id.id = other_user.id
上面的写法是没有问题的,在第一个join的时候不会有数据倾斜
但是在第二个join的时候会有问题,示意图如下
三表join.jpg
比较难理解的是后面user_di
的那部分数据会被补成null
由于user_df
和user_di
用full join
来关联的,关联之后,我们如果输出user_id
的id
字段,可以看到会有很多是null的,然后在这种场景下面再去join other_user
表的时候,user_id.id
就并不是它原来的值了,使用这个时候的user.id
做分区的时候会有1个分区出现数据倾斜,并且这个分区是有很多null
的,量级差不多就是user_df
和user_id
的差集了,要解决这种数据也很容易,修改join
的条件为
select coalesce(user_df.id, user_di.id) id, other_user.info
from user_df --用户全量表
full join user_di --用户增量表
on user_df.id = user_di.id
left join other_user --其他用户信息
on coalesce(user_df.id, user_di.id) = other_user.id
最后一个on
的调整将null
的数据补充上去,就可以解决这个问题了