Flink SQL 在字节跳动的优化与实践
整理 | Aven (Flink 社区志愿者)
摘要:本文由 Apache Flink Committer,字节跳动架构研发工程师李本超分享,以四个章节来介绍 Flink 在字节的应用实战。 内容如下:
- 整体介绍
- 实践优化
- 流批一体
- 未来规划
一、整体介绍

2018 年 12 月 Blink 宣布开源,经历了约一年的时间 Flink 1.9 于 2019 年 8 月 22 发布。在 Flink 1.9 发布之前字节跳动内部基于 master 分支进行内部的 SQL 平台构建。经历了 2~3 个月的时间字节内部在 19 年 10 月份发布了基于 Flink 1.9 的 Blink planner 构建的 Streaming SQL 平台,并进行内部推广。在这个过程中发现了一些比较有意思的需求场景,以及一些较为奇怪的 BUG。
基于 1.9 的 Flink SQL 扩展
虽然最新的 Flink 版本已经支持 SQL 的 DDL,但 Flink 1.9 并不支持。字节内部基于 Flink 1.9 进行了 DDL 的扩展支持以下语法:
- create table
- create view
- create function
- add resource
同时 Flink 1.9 版本不支持的 watermark 定义在 DDL 扩展后也支持了。
我们在推荐大家尽量的去用 SQL 表达作业时收到很多“SQL 无法表达复杂的业务逻辑”的反馈。时间久了发现其实很多用户所谓的复杂业务逻辑有的是做一些外部的 RPC 调用,字节内部针对这个场景做了一个 RPC 的维表和 sink,让用户可以去读写 RPC 服务,极大的扩展了 SQL 的使用场景,包括 FaaS 其实跟 RPC 也是类似的。在字节内部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等维表的支持。
同时还实现了多个内部使用的 connectors:
- source: RocketMQ
- sink:
RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics
并且为 connector 开发了配套的 format:PB/Binlog/Bytes。
在线的界面化 SQL 平台

除了对 Flink 本身功能的扩展,字节内部也上线了一个 SQL 平台,支持以下功能:
- SQL 编辑
- SQL 解析
- SQL 调试
- 自定义 UDF 和 Connector
- 版本控制
- 任务管理
二、实践优化
除了对功能的扩展,针对 Flink 1.9 SQL 的不足之处也做了一些优化。
Window 性能优化
1、支持了 window Mini-Batch
Mini-Batch 是 Blink planner 的一个比较有特色的功能,其主要思想是积攒一批数据,再进行一次状态访问,达到减少访问状态的次数降低序列化反序列化的开销。这个优化主要是在 RocksDB 的场景。如果是 Heap 状态 Mini-Batch 并没什么优化。在一些典型的业务场景中,得到的反馈是能减少 20~30% 左右的 CPU 开销。
2、扩展 window 类型
目前 SQL 中的三种内置 window,滚动窗口、滑动窗口、session 窗口,这三种语意的窗口无法满足一些用户场景的需求。比如在直播的场景,分析师想统计一个主播在开播之后,每一个小时的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指标。自然的滚动窗口的划分方式并不能够满足用户的需求,字节内部就做了一些定制的窗口来满足用户的一些共性需求。
-- my_window 为自定义的窗口,满足特定的划分方式
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
my_window(ts, INTERVAL '1' HOURS)
3、window offset
这是一个较为通用的功能,在 Datastream API 层是支持的,但 SQL 中并没有。这里有个比较有意思的场景,用户想要开一周的窗口,一周的窗口变成了从周四开始的非自然周。因为谁也不会想到 1970 年 1 月 1 号那天居然是周四。在加入了 offset 的支持后就可以支持正确的自然周窗口。
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)
维表优化
1、延迟 Join
维表 Join 的场景下因为维表经常发生变化尤其是新增维度,而 Join 操作发生在维度新增之前,经常导致关联不上。
所以用户希望如果 Join 不到,则暂时将数据缓存起来之后再进行尝试,并且可以控制尝试次数,能够自定义延迟 Join 的规则。这个需求场景不单单在字节内部,社区的很多同学也有类似的需求。
基于上面的场景实现了延迟 Join 功能,添加了一个可以支持延迟 Join 维表的算子。当 Join 没有命中,local cache 不会缓存空的结果,同时将数据暂时保存在一个状态中,之后根据设置定时器以及它的重试次数进行重试。

2、维表 Keyby 功能

通过拓扑我们发现 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因为它没有一个 key 的语义。
当作业并行度比较大,每一个维表 Join 的 subtask,访问的是所有的缓存空间,这样对缓存来说有很大的压力。
但观察 Join 的 SQL,等值 Join 是天然具有 Hash 属性的。直接开放了配置,运行用户直接把维表 Join 的 key 作为 Hash 的条件,将数据进行分区。这样就能保证下游每一个算子的 subtask 之间的访问空间是独立的,这样可以大大的提升开始的缓存命中率。
除了以上的优化,还有两点目前正在开发的维表优化。
1、广播维表:有些场景下维表比较小,而且更新不频繁,但作业的 QPS 特别高。如果依然访问外部系统进行 Join,那么压力会非常大。并且当作业 Failover 的时候 local cache 会全部失效,进而又对外部系统造成很大访问压力。那么改进的方案是定期全量 scan 维表,通过Join key hash 的方式发送到下游,更新每个维表 subtask 的缓存。
2、Mini-Batch:主要针对一些 I/O 请求比较高,系统又支持 batch 请求的能力,比如说 RPC、HBase、Redis 等。以往的方式都是逐条的请求,且 Async I/O 只能解决 I/O 延迟的问题,并不能解决访问量的问题。通过实现 Mini-Batch 版本的维表算子,大量降低维表关联访问外部存储次数。
Join 优化
目前 Flink 支持的三种 Join 方式;分别是 Interval Join、Regular Join、Temporal Table Function。
前两种语义是一样的流和流 Join。而 Temporal Table 是流和表的的 Join,右边的流会以主键的形式形成一张表,左边的流去 Join 这张表,这样一次 Join 只能有一条数据参与并且只返回一个结果。而不是有多少条都能 Join 到。
它们之间的区别列了几点:

可以看到三种 Join 方式都有它本身的一些缺陷。
- Interval Join 目前使用上的缺陷是它会产生一个 out join 数据和 watermark 乱序的情况。
- Regular Join 的话,它最大的缺陷是 retract 放大(之后会详细说明这个问题)。
- Temporal table function 的问题较其它多一些,有三个问题。
- 不支持 DDl
- 不支持 out join 的语义 (FLINK-7865 的限制)
- 右侧数据断流导致 watermark 不更新,下游无法正确计算 (FLINK-18934)
对于以上的不足之处字节内部都做了对应的修改。
增强 Checkpoint 恢复能力
对于 SQL 作业来说一旦发生条件变化都很难从 checkpoint 中恢复。
SQL 作业确实从 checkpoint 恢复的能力比较弱,因为有时候做一些看起来不太影响 checkpoint 的修改,它仍然无法恢复。无法恢复主要有两点;
- 第一点:operate ID 是自动生成的,然后因为某些原因导致它生成的 ID 改变了。
- 第二点:算子的计算的逻辑发生了改变,即算子内部的状态的定义发生了变化。
例子1:并行度发生修改导致无法恢复。

source 是一个最常见的有状态的算子,source 如果和之后的算子的 operator chain 逻辑发生了改变,是完全无法恢复的。
下图左上是正常的社区版的作业会产生的一个逻辑, source 和后面的并行度一样的算子会被 chain 在一起,用户是无法去改变的。但算子并行度是常会会发生修改,比如说 source 由原来的 100 修改为 50,cacl 的并发是 100。此时 chain 的逻辑就会发生变化。

针对这种情况,字节内部做了修改,允许用户去配置,即使 source 的并行度跟后面整体的作业的并行度是一样的,也让其不与之后的算子 chain 在一起。
例子2:DAG 改变导致无法恢复。

这是一种比较特殊的情况,有一条 SQL (上图),可以看到 source 没有发生变化,之后的三个聚合互相之间没有关系,状态竟然也是无法恢复。
作业之所以无法恢复,是因为 operator ID 生成规则导致的。目前 SQL 中 operator ID 的生成的规则与上游、本身配置以及下游可以 chain 在一起的算子的数量都有关系。 因为新增指标,会导致新增一个 Calc 的下游节点,进而导致 operator ID 发生变化。
为了处理这种情况,支持了一种特殊的配置模式,允许用户配置生成 operator ID 的时候可以忽略下游 chain 在一起算子数量的条件。
例子3:新增聚合指标导致无法恢复
这块是用户诉求最大的,也是最复杂的部分。用户期望新增一些聚合指标后,原来的指标要能从 checkpoint 中恢复。

可以看到图中左部分是 SQL 生成的算子逻辑。count,sum,sum,count,distinct 会以一个 BaseRow 的结构存储在 ValueState 中。distinct 比较特殊一些,还会单独存储在一个 MapState 中。
这导致了如新增或者减少指标,都会使原先的状态没办法从 ValueState 中正常恢复,因为 VauleState 中存储的状态 “schema” 和新的(修改指标后)的 “schema”不匹配,无法正常反序列化。


在讨论解决方案之前,我们先回顾一下正常的恢复流。先从 checkpoint 中恢复出状态的 serializer,再通过 serializer 把状态恢复。接下来 operator 去注册新的状态定义,新的状态定义会和原先的状态定义进行一个兼容性对比,如果是兼容则状态恢复成功,如果不兼容则抛出异常任务失败。
不兼容的另一种处理情况是允许返回一个 migration(实现两个不匹配类型的状态恢复)那么也可以恢复成功。
针对上面的流程做出对应的修改:
- 第一步使新旧 serializer 互相知道对方的信息,添加一个接口,且修改了 statebackend resolve compatibility 的过程,把旧的信息传递给新的,并使其获取整个 migrate 过程。
- 第二步判断新老之间是否兼容,如果不兼容是否需要做一次 migration。然后让旧的 serializer 去恢复一遍状态,并使用新的 serializer 写入新的状态。
- 对 aggregation 的代码生成进行处理,当发现 aggregation 拿到的是指标是 null,那么将做一些初始化的工作。
通过以上的修改基本就可以做到正常的,新增的聚合指标从拆开的方案恢复。
三、流批一体探索
业务现状
字节跳动内部对流批一体和业务推广之前,技术团队提前做了大量技术方面的探索。整体判断是 SQL 这一层是可以做到流批一体的语义,但实践中却又发现不少不同。
比如说流计算的 session window,或是基于处理时间的 window,在批计算中无法做到。同时 SQL 在批计算中一些复杂的 over window,在流计算中也没有对应的实现。
但这些特别的场景可能只占 10% 甚至更少,所以用 SQL 去落实流批一体是可行的。

流批一体
这张图是比较常见的和大多数公司里的架构都类似。这种架构有什么缺陷呢?
- 数据不同源:批任务一般会有一次前置处理任务,不管是离线的也好实时的也好,预先进过一层加工后写入 Hive。而实时任务是从 kafka 读取原始的数据,可能是 json 格式,也可能是 avro 等等。直接导致批任务中可执行的 SQL 在流任务中没有结果生成或者执行结果不对。
- 计算不同源:批任务一般是 Hive + Spark 的架构,而流任务基本都是基于 Flink。不同的执行引擎在实现上都会有一些差异,导致结果不一致。不同的执行引擎有不同的 API 定义 UDF,它们之间也是无法被公用的。大部分情况下都是维护两套基于不同 API 实现的相同功能的 UDF。
鉴于上面的问题,提出了基于 Flink 的流批一体架构来解决。
- 数据不同源:流式处理先通过 Flink 处理之后写入 MQ 供下游流式 Flink job 去消费,对于批式处理由 Flink 处理后流式写入到 Hive,再由批式的 Flink job 去处理。
- 引擎不同源:既然都是基于 Flink 开发的流式,批式 job,自然没有计算不同源问题,同时也避免了维护多套相同功能的 UDF。
基于 Flink 实现的流批一体架构:

业务收益
- 统一的 SQL:通过一套 SQL 来表达流和批计算两种场景,减少开发维护工作。
- 复用 UDF:流式和批式计算可以共用一套 UDF。这对业务来说是有积极意义的。
- 引擎统一:对于业务的学习成本和架构的维护成本都会降低很多。
- 优化统一:大部分的优化都是可以同时作用在流式和批式计算上,比如对 planner、operator 的优化流和批可以共享。
四、未来工作和规划
优化 retract 放大问题

什么是 retract 放大?
上图有 4 张表,第一张表进行去重操作 (Dedup),之后分别和另外三张表做 Join。逻辑比较简单,表 A 输入(A1),最后产出 (A1,B1,C1,D1) 的结果。
当表 A 输入一个 A2,因为 Dedup 算子,导致数据需要去重,则向下游发送一个撤回 A1 的操作 -(A1) 和一个新增 A2 的操作 +(A2)。第一个 Join 算子收到 -(A1) 后会将 -(A1) 变成 -(A1,B1) 和 +(null,B1)(为了保持它认为的正确语义) 发送到下游。之后又收到了 +(A2) ,则又向下游发送 -(null,B1) 和 +(A2,B1) 这样操作就放大了两倍。再经由下游的算子操作会一直被放大,到最终的 sink 输出可能会被放大 1000 倍之多。

如何解决?
将原先 retract 的两条数据变成一条 changelog 的格式数据,在算子之间传递。算子接收到 changelog 后处理变更,然后仅仅向下游发送一个变更 changelog 即可。
未来规划

1.功能优化
- 支持所有类型聚合指标变更的 checkpoint 恢复能力
- window local-global
- 事件时间的 Fast Emit
- 广播维表
- 更多算子的 Mini-Batch 支持:维表,TopN,Join 等
- 全面兼容 Hive SQL 语法
2.业务扩展
- 进一步推动流式 SQL 达到 80%
- 探索落地流批一体产品形态
- 推动实时数仓标准化