ClickHouse 引擎在行为分析场景下的 JOIN 优化
火山引擎增长分析 DataFinder 基于 ClickHouse 来进行行为日志的分析,ClickHouse 的主要版本是基于社区版改进开发的字节内部版本。
1. 背景
火山引擎增长分析 DataFinder 基于 ClickHouse 来进行行为日志的分析,ClickHouse 的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:
![](https://img.haomeiwen.com/i1233356/84cf174e6310b237.png)
事件表:存储用户行为数据,以用户 ID 分 shard 存储。
--列出了主要的字段信息
CREATE TABLE tob_apps_all
(
`tea_app_id` UInt32, --应用ID
`device_id` String DEFAULT '', --设备ID
`time` UInt64,--事件日志接受时间
`event` String,--事件名称
`user_unique_id` String,--用户ID
`event_date` Date , --事件日志日期,由time转换而来
`hash_uid` UInt64 --用户ID hash过后的id,用来join降低内存消耗
)│
用户表:存储用户的属性数据,以用户 ID 分 shard 存储。
--列出了主要的字段信息
CREATE TABLE users_unique_all
(
`tea_app_id` UInt32, --应用ID
`user_unique_id` String DEFAULT '', -- 用户ID
`device_id` String DEFAULT '', -- 用户最近的设备ID
`hash_uid` UInt64,--用户ID hash过后的id,用来join降低内存消耗
`update_time` UInt64,--最近一次更新时间
`last_active_date` Date --用户最后活跃日期
)
设备表:存储设备相关的数据,以设备 ID 分 shard 存储。
--列出了主要的字段信息
CREATE TABLE devices_all
(
`tea_app_id` UInt32, --应用ID
`device_id` String DEFAULT '', --设备ID
`update_time` UInt64, --最近一次更新时间
`last_active_date` Date --用户最后活跃日期
)
业务对象表:存储业务对象相关的数据,每个 shard 存储全量的数据。
--列出了主要的字段信息
CREATE TABLE rangers.items_all
(
`tea_app_id` UInt32,
`hash_item_id` Int64,
`item_name` String, --业务对象名称。比如商品
`item_id` String, --业务对象ID。比如商品id 1000001
`last_active_date` Date
)
1.1 业务挑战
![](https://img.haomeiwen.com/i1233356/55742bf7e8152470.png)
随着接入应用以及应用的 DAU 日益增加,ClickHouse 表的事件量增长迅速;并且基于行为数据需要分析的业务指标越来越复杂,需要 JOIN 的表增多;我们遇到有一些涉及到 JOIN 的复杂 SQL 执行效率低,内存和 CPU 资源占用高,导致分析接口响应时延和错误率增加。
2. 关于 Clickhouse 的 JOIN
在介绍优化之前,先介绍一下基本的 ClickHouse JOIN 的类型和实现方式。
2.1 分布式 JOIN
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
基本执行过程:
一个 Clickhouse 节点作为 Coordinator 节点,给每个节点分发子查询,子查询 sql(tob_apps_all 替换成本地表,users_unique_all 保持不变依然是分布式表)。
每个节点执行 Coordinator 分发的 sql 时,发现 users_unique_all 是分布式表,就会去所有节点上去查询以下 SQL(一共有 N*N。N 为 shard 数量)。
-
SELECT device_id, hash_uid FROMusers_uniqueWHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
-
每个节点从其他 N-1个节点拉取2中子查询的全部数据,全量存储(内存 or 文件),进行本地 JOIN。
-
Coordinator 节点从每个节点拉取3中的结果集,然后做处理返回给 client。
存在的问题:
- 子查询数量放大。
- 每个节点都全量存储全量的数据。
2.2 分布式 Global JOIN
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
基本执行过程:
- 一个 Clickhouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行sql(tob_apps_all 替换成本地表,右表子查询替换成别名 ut)。
- Coordinator 节点去其他节点拉取 users_unique_all 的全部数据,然后分发到全部节点(作为1中别名表 ut 的数据)。
- 每个节点都会存储全量的2中分发的数据(内存or文件),进行本地 local join。
- Coordinator 节点从每个节点拉取3中的结果集,然后做处理返回给 client。
存在的问题:
- 每个节点都全量存储数据。
- 如果右表较大,分发的数据较大,会占用网络带宽资源。
2.3 本地 JOIN
SQL 里面只有本地表的 JOIN,只会在当前节点执行。
SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
(SELECT device_id,
hash_uid
FROM rangers.users_unique
WHERE tea_app_id = 268411
AND last_active_date>='2022-08-06') ut
ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
AND event='app_launch'
AND event_date='2022-08-06'
2.3.1 Hash join
- 右表全部数据加载到内存,再在内存构建 hash table。key 为 joinkey。
- 从左表分批读取数据,从右表 hash table匹配数据。
- 优点是:速度快 缺点是:右表数据量大的情况下占用内存。
2.3.2 Merge join
- 对右表排序,内部 block 切分,超出内存部分 flush 到磁盘上,内存大小通过参数设定。
- 左表基于 block 排序,按照每个 block 依次与右表 merge。
- 优点是:能有效控制内存 缺点是:大数据情况下速度会慢。
优先使用 hash join 当内存达到一定阈值后再使用 merge join,优先满足性能要求。
3. 解决方案
![](https://img.haomeiwen.com/i1233356/67406c0ee3795ff5.png)
3.1 避免JOIN
3.1.1 数据预生成
数据预生成(由 Spark/Flink 或者 Clickhouse 物化视图产出数据),形成大宽表,基于单表的查询是 ClickHouse 最为擅长的场景。
我们有个指标,实现的 SQL 比较复杂(如下),每次实时查询很耗时,我们单独建了一个表 table,由 Spark 每日构建出这个指标,查询时直接基于 table 查询。
SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
(SELECT event_date,hash_uid AS uc1,sum(et.float_params{ 'amount' }) AS value, count(1) AS cnt, value*cnt AS multiple
FROM tob_apps_all et GLOBAL ANY LEFT JOIN
(SELECT hash_uid AS join_key,int_profiles{ '$ab_time_34' }*1000 AS first_time
FROM users_unique_all
WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
ON et.hash_uid=upt.join_key
WHERE (查询条件)
GROUP BY uc1,event_date)
GROUP BY event_date;
数据量2300W,查询时间由7秒->0.008秒。当然这种方式,需要维护额外的数据构建任务。总的思路就是不要让 ClickHouse 实时去 JOIN。
![](https://img.haomeiwen.com/i1233356/d39274e5c68d95ba.png)
3.1.2 使用 IN 代替 JOIN
JOIN 需要基于内存构建 hash table 且需要存储右表全部的数据,然后再去匹配左表的数据。而 IN 查询会对右表的全部数据构建 hash set,但是不需要匹配左表的数据,且不需要回写数据到 block。
比如:
SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
(SELECT hash_uid AS join_key
FROM users_unique_all
WHERE app_id = 10000000
AND last_active_date >= '2022-01-01') upt
ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
GROUP BY event_date
可以改成如下形式:
SELECT event_date,
count()
FROM tob_apps_all
WHERE app_id = 10000000
AND event_date >= '2022-01-01'
AND event_date <= '2022-08-02'
AND hash_uid global IN
(SELECT hash_uid
FROM users_unique_all
WHERE (tea_app_id = 10000000)
AND (last_active_date >= '2022-01-01') )
GROUP BY event_date
如果需要从右表提取出属性到外层进行计算,则不能使用 IN 来代替 JOIN。
相同的条件下,上面的测试 SQL,由 JOIN 时的16秒优化到了 IN 查询时的11秒。
![](https://img.haomeiwen.com/i1233356/22af04139d966470.png)
3.2 更快的 JOIN
3.2.1 优先本地 JOIN
数据预先相同规则分区
也就是 Colocate JOIN。优先将需要关联的表按照相同的规则进行分布,查询时就不需要分布式的 JOIN。
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
比如事件表 tob_apps_all 和用户表 users_unique_all 都是按照用户 ID 来分 shard 存储的,相同的用户的两个表的数据都在同一个 shard 上,因此这两个表的 JOIN 就不需要分布式 JOIN 了。
distributed_perfect_shard 这个 settings key 是字节内部 ClickHouse 支持的,设置过这个参数,指定执行计划时就不会再执行分布式 JOIN 了。
基本执行过程:
- 一个 ClickHouse 节点作为 Coordinator 节点,分发查询。在每个节点上执行 sql(tob_apps_all、users_unique_all替换成本地表)。
- 每个节点都执行1中分发的本地表 join 的 SQL(这一步不再分发右表全量的数据)。
- 数据再回传到 coordinator 节点,然后返回给 client。
数据冗余存储
如果一个表的数据量比较小,可以不分 shard 存储,每个 shard 都存储全量的数据,例如我们的业务对象表。查询时,不需要分布式 JOIN,直接在本地进行 JOIN 即可。
SELECT count()
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT item_id
FROM items_all
WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
例如这个 SQL,items_all 表每个 shard 都存储同样的数据,这样也可以避免分布式 JOIN 带来的查询放大和全表数据分发问题。
3.2.2 更少的数据
不论是分布式 JOIN 还是本地 JOIN,都需要尽量让少的数据参与 JOIN,既能提升查询速度也能减少资源消耗。
SQL 下推
ClickHouse 对 SQL 的下推做的不太好,有些复杂的 SQL 下推会失效。因此,我们手动对 SQL 做了下推,目前正在测试基于查询优化器来帮助实现下推优化,以便让 SQL 更加简洁。
下推的 SQL:
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06'
AND 用户属性条件 1 OR 用户属性条件 2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
对应的不下推的 SQL:
SELECT
et.os_name,
ut.device_id AS user_device_id
FROM tob_apps_all AS et
ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM rangers.users_unique_all
WHERE (tea_app_id = 268411)
AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411)
AND (event = 'app_launch')
AND (event_date = '2022-08-06')
AND (ut.用户属性条件 1 OR ut.用户属性条件 2)
settings distributed_perfect_shard=1
可以看到,不下推的 SQL 更加简洁,直接基于 JOIN 过后的宽表进行过滤。但是 ClickHouse 可能会将不满足条件的 users_unique_all 数据也进行 JOIN。
我们使用中有一个复杂的 case,用户表过滤条件不下推有1千万+,SQL 执行了3000秒依然执行超时,而做了下推之后60秒内就执行成功了。
![](https://img.haomeiwen.com/i1233356/f4a3d923aa8832ad.png)
3.2.3 Clickhouse 引擎层优化
一个 SQL 实际在 Clickhouse 如何执行,对 SQL 的执行时间和资源消耗至关重要。社区版的 Clickhouse 在执行模型和 SQL 优化器上还要改进的空间,尤其是复杂 SQL 以及多 JOIN 的场景下。
执行模型优化
社区版的 Clickhouse 目前还是一个两阶段执行的执行模型。第一阶段,Coordinator 在收到查询后,将请求发送给对应的 Worker 节点。第二阶段,Worker 节点完成计算,Coordinator 在收到各 Worker 节点的数据后进行汇聚和处理,并将处理后的结果返回。
![](https://img.haomeiwen.com/i1233356/7839278d51dfb33d.png)
有以下几个问题:
- 第二阶段的计算比较复杂时,Coordinator 的节点计算压力大,容易成为瓶颈。
- 不支持 shuffle join,hash join 时右表为大表时构建慢,容易 OOM。
- 对复杂查询的支持不友好。
字节跳动 ClickHouse 团队为了解决上述问题,改进了执行模型,参考其他的分布式数据库引擎(例如 Presto 等),将一个复杂的 Query 按数据交换情况切分成多个 Stage,各 Stage 之间则通过 Exchange 完成数据交换。根据 Stage 依赖关系定义拓扑结构,产生 DAG 图,并根据 DAG 图调度 Stage。例如两表 JOIN,会先调度左右表读取 Stage,之后再调度 JOIN 这个 Stage, JOIN 的 Stage 依赖于左右表的 Stage。
![](https://img.haomeiwen.com/i1233356/b8c56e29304b0a8e.png)
举个例子
SELECT
et.os_name,
ut.device_id AS user_device_id,
dt.hash_did AS device_hashid
FROM tob_apps_all AS et
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_uid
FROM users_unique_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN
(
SELECT
device_id,
hash_did
FROM devices_all
WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10
Stage执行模型基本过程(可能的):
- 读取 tob_apps_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个Stage。
- 读取 users_unique_all 数据,按照 join key(hash_uid)进行 shuffle,数据分发到每个节点。这是一个 Stage。
- 上述两个表的数据,在每个节点上的数据进行本地JOIN,然后再按照 join key(device_id) 进行 shuffle。这是一个 Stage。
- 读取 devices_all 数据,按照 join key(device_id)进行 shuffle,这是一个Stage。
- 第3步、第4步的数据,相同 join key(device_id) 的数据都在同一个节点上,然后进行本地JOIN,这是一个 Stage。
- 汇总数据,返回 limit 10 的数据。这是一个 Stage。
统计效果如下:
![](https://img.haomeiwen.com/i1233356/347dec45dbbf42ea.png)
查询优化器
有了上面的 stage 的执行模型,可以灵活调整 SQL 的执行顺序,字节跳动 Clickhouse 团队自研了查询优化器,根据优化规则(基于规则和代价预估)对 SQL 的执行计划进行转换,一个执行计划经过优化规则后会变成另外一个执行计划,能够准确的选择出一条效率最高的执行路径,然后构建 Stage 的 DAG 图,大幅度降低查询时间。
下图描述了整个查询的执行流程,从 SQL parse 到执行期间所有内容全部进行了重新实现(其中紫色模块),构建了一套完整的且规范的查询优化器。
![](https://img.haomeiwen.com/i1233356/fec1195caf440af3.png)
还是上面的三表 JOIN 的例子,可能的一个执行过程是:
-
查询优化器发现 users_unique_all 表与 tob_apps_all 表的分 shard 规则一样(基于用户 ID ),所以就不会先对表按 join key 进行 shuffle,users_unique 与 tob_apps 直接基于本地表 JOIN,然后再按照 join key(device_id)进行 shuffle。这是一个 Stage。
-
查询优化器根据规则或者代价预估决定设备表 devices_all 是需要 broadcast join 还是 shuffle join。
-
如果 broadcast join:在一个节点查到全部的 device 数据,然后分发到其他节点。这是一个 Stage。
-
如果 shuffle join:在每个节点对 device 数据按照 join key(device_id) 进行 shuffle。这是一个 Stage。
-
汇总数据,返回 limit 10 的数据。这是一个 Stage。
效果:
可以看到,查询优化器能优化典型的复杂的 SQL 的执行效率,缩短执行时间。
![](https://img.haomeiwen.com/i1233356/d229de3710f38d6f.png)
4. 总结
ClickHouse 最为擅长的领域是一个大宽表来进行查询,多表 JOIN 时Clickhouse 性能表现不佳。作为业内领先的用户分析与运营平台,火山引擎增长分析 DataFinder 基于海量数据做到了复杂指标能够秒级查询。本文介绍了我们是如何优化 Clickhouse JOIN 查询的。
主要有以下几个方面:
- 减少参与 JOIN 的表以及数据量。
- 优先使用本地 JOIN,避免分布式 JOIN 带来的性能损耗。
- 优化本地 JOIN,优先使用内存进行 JOIN。
- 优化分布式 JOIN 的执行逻辑,依托于字节跳动对 ClickHouse 的深度定制化。