Spark计算中的数据倾斜

2022-01-13  本文已影响0人  天之見證

本文的讨论场景限定在spark计算引擎,但是并不局限于spark,相关的讨论可以迁移到其他的计算引擎
Spark计算中什么是数据倾斜?
所有的数据倾斜,以task粒度来说,就是单个task的数据条数,相比较其他task的数据条数大很多倍,具体我们可以通过task summary看出来,max的时候,读取的数据特别多

除了一种极端情况,数据条数差异不大,但是由于单个字段大小的不同,可能出现数据容量差异很大,可能一个task中1000条数据是100k,在另一个task中1000条数据是100m,这里不讨论这种情况

数据倾斜.png

那么为什么会有这种情况出现呢?我们分2种场景讨论:

1. 初始直接从存储层读数据

一般来讲,这一层不会出现读数据倾斜,因为使用hash分片来读数据,hash可以保证每个分区读取数据条数的稳定
但是如果是带有谓词下推的读取数据的话,会出现数据倾斜,这种情况无可厚非,而且不需要特别解决,因为后续的数据处理过程中,数据的再次shffule会磨平这种情况,所以不需要特别关注

2. 任务计算中

任务计算过程中,由于涉及到shuffle,导致数据会重分区,一些场景下容易造成数据倾斜

2.1 group by场景

group by的时候,如果使用的key有明显的倾斜倾向就会出现数据倾斜
例如某款应用的用户信息的详情表有如下字段:

-- user_tb
city_id: string  //用户所在城市id,例如深圳,广州
user_id: bigint  // 用户id
select city_id, count(1)
from user_tb
group by city_id

执行以上SQL的时候,spark会以city的hash做分区,可以想见如果某个分区里面有北京这样的城市的话,分区里面的数据会特别多
这种情况下的数据倾斜可以添加随机前缀+二次聚合来处理

2.2 join 场景

2.2.1 双表

双表join涉及到3个stage,以下面是sql为例

select user_tb.user_id, city_info.city_name
from user_tb
left join city_info
on user_tb.city_id = city_info.id

2个stage是分别读取user_tbcity_info的阶段,经历2个读文件的阶段,这个时候不会有数据倾斜,但是读完文件之后,由于下一个步骤是shuffle,所以2个表都要进行shuffle write,并且以city_id的hash来重分区,这时候就类似上面group by的场景了,会出现某些分区特别大,真正第3个stage来join数据的时候就可以看出差异了,具体可以参看上面数据倾斜的截图

在这种情况下,由于city_info比较小,可以采用mapjoin的方式,避免掉shuffle过程,直接一个stage就处理数据了

2.2.2 三表

为什么单拿出三表join的场景呢, 因为大多数情况下的时候可以将其看作2次双表join来处理,但是有些特殊的情况下,第二次的双表join的时候,数据的情况会有些变化,导致不好排查

以下面的sql为例:

select coalesce(user_df.id, user_di.id) id, other_user.info
from user_df --用户全量表
full join user_di --用户增量表
on user_df.id = user_di.id
left join other_user  --其他用户信息
on user_id.id = other_user.id

上面的写法是没有问题的,在第一个join的时候不会有数据倾斜
但是在第二个join的时候会有问题,示意图如下


三表join.jpg

比较难理解的是后面user_di的那部分数据会被补成null
由于user_dfuser_difull join来关联的,关联之后,我们如果输出user_idid字段,可以看到会有很多是null的,然后在这种场景下面再去join other_user表的时候,user_id.id就并不是它原来的值了,使用这个时候的user.id做分区的时候会有1个分区出现数据倾斜,并且这个分区是有很多null的,量级差不多就是user_dfuser_id的差集了,要解决这种数据也很容易,修改join的条件为

select coalesce(user_df.id, user_di.id) id, other_user.info
from user_df --用户全量表
full join user_di --用户增量表
on user_df.id = user_di.id
left join other_user  --其他用户信息
on coalesce(user_df.id, user_di.id) = other_user.id

最后一个on的调整将null的数据补充上去,就可以解决这个问题了

上一篇下一篇

猜你喜欢

热点阅读