flink rocksdbStateBackend
比较直观的方式是打开 RocksDB 的 native metrics ,在默认使用 Flink managed memory 方式的情况下,
state.backend.rocksdb.metrics.block-cache-usage ,
state.backend.rocksdb.metrics.mem-table-flush-pending,
state.backend.rocksdb.metrics.num-running-compactions
以及 state.backend.rocksdb.metrics.num-running-flushes 是比较重要的相关 metrics。
Flink-1.10 之后,由于引入了 RocksDB 的内存托管机制,在绝大部分情况下, RocksDB 的这一部分 native 内存是可控的,不过受限于 RocksDB 的相关 cache 实现限制(这里暂不展开,后续会有文章讨论),在某些场景下,无法做到完美控制,这时候建议打开上文提到的 native metrics,观察相关 block cache 内存使用是否存在超用情况,可以将相关内存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空间给 native 内存使用
在 RocksDB 中,每个 state 独享一个 Column Family,而每个 Column family 使用各自独享的 write buffer 和 block cache,上图中的 window state 和 value state实际上分属不同的 column family。
从Flink-1.10 开始引入的 managed memory [1][2],会将slot上的RocksDB的实际可用内存限制在 managed memory / number of slots,也就是说对于你配置的10个slot,20GB的process memory,0.75的managed fraction,真实的per slot managed memory其实只有不到1.5GB,也就是说你配置的write buffer count以及max write buffer啥的并没有真正“生效”。RocksDB的write buffer manager会提前将write buffer 置为immutable并flush出去。应该增大 managed memory / number of slots 来增大单个slot内多个RocksDB的共享可用内存
rocksDB 使用优势:
1、在1.10版本及以后,由于 TaskManager 内存模型重构,RocksDB 内存默认成为了堆外托管内存的一部分,可以免去一些手动调整的麻烦。如果性能仍然不佳,需要干预,则必须将 state.backend.rocksdb.memory.managed 参数设为 false 来禁用 RocksDB 内存托管
2、不受 Java 垃圾回收的影响,与 heap 对象相比,它的内存开销更低,并且是目前唯一支持增量检查点(incremental checkpointing)的选项。
3、使用 RocksDB,状态大小仅受限于本地可用的磁盘空间大小,这很适合 state 特别大的 Flink 作业
4、RocksDBStateBackend:无需担心 OOM 风险,是大部分时候的选择。
参数调优:
Tuning MemTable:
以下是一些值得注意的参数。为方便对比,下文都会将 RocksDB 的原始参数名与 Flink 配置中的参数名一并列出,用竖线分割。
write_buffer_size | state.backend.rocksdb.writebuffer.size
单个 memtable 的大小,默认是64MB。当 memtable 大小达到此阈值时,就会被标记为不可变。一般来讲,适当增大这个参数可以减小写放大带来的影响,但同时会增大 flush 后 L0、L1 层的压力,所以还需要配合修改 compaction 参数,后面再提。
max_write_buffer_number | state.backend.rocksdb.writebuffer.count
memtable 的最大数量(包含活跃的和不可变的),默认是2。当全部 memtable 都写满但是 flush 速度较慢时,就会造成写停顿,所以如果内存充足或者使用的是机械硬盘,建议适当调大这个参数,如4。
min_write_buffer_number_to_merge | state.backend.rocksdb.writebuffer.number-to-merge
在 flush 发生之前被合并的 memtable 最小数量,默认是1。举个例子,如果此参数设为2,那么当有至少两个不可变 memtable 时,才有可能触发 flush(亦即如果只有一个不可变 memtable,就会等待)。调大这个值的好处是可以使更多的更改在 flush 前就被合并,降低写放大,但同时又可能增加读放大,因为读取数据时要检查的 memtable 变多了。经测试,该参数设为2或3相对较好。
Tuning Block/Block Cache:
block 是 sstable 的基本存储单位。block cache 则扮演读缓存的角色,采用 LRU 算法存储最近使用的 block,对读性能有较大的影响。
block_size | state.backend.rocksdb.block.blocksize
block 的大小,默认值为4KB。在生产环境中总是会适当调大一些,一般32KB比较合适,对于机械硬盘可以再增大到128~256KB,充分利用其顺序读取能力。但是需要注意,如果 block 大小增大而 block cache 大小不变,那么缓存的 block 数量会减少,无形中会增加读放大。
block_cache_size | state.backend.rocksdb.block.cache-size
block cache 的大小,默认为8MB。由上文所述的读写流程可知,较大的 block cache 可以有效避免热数据的读请求落到 sstable 上,所以若内存余量充足,建议设置到128MB甚至256MB,读性能会有非常明显的提升。
Tuning Compaction
ompaction 在所有基于 LSM Tree 的存储引擎中都是开销最大的操作,弄不好的话会非常容易阻塞读写。建议看官先读读前面那篇关于 RocksDB 的 compaction 策略的文章,获取一些背景知识,这里不再赘述。
compaction_style | state.backend.rocksdb.compaction.style
compaction 算法,使用默认的 LEVEL(即 leveled compaction)即可,下面的参数也是基于此。
target_file_size_base | state.backend.rocksdb.compaction.level.target-file-size-base
L1层单个 sstable 文件的大小阈值,默认值为64MB。每向上提升一级,阈值会乘以因子 target_file_size_multiplier(但默认为1,即每级sstable最大都是相同的)。显然,增大此值可以降低 compaction 的频率,减少写放大,但是也会造成旧数据无法及时清理,从而增加读放大。此参数不太容易调整,一般不建议设为256MB以上。
max_bytes_for_level_base | state.backend.rocksdb.compaction.level.max-size-level-base
L1层的数据总大小阈值,默认值为256MB。每向上提升一级,阈值会乘以因子 max_bytes_for_level_multiplier(默认值为10)。由于上层的大小阈值都是以它为基础推算出来的,所以要小心调整。建议设为 target_file_size_base 的倍数,且不能太小,例如5~10倍。
level_compaction_dynamic_level_bytes | state.backend.rocksdb.compaction.level.use-dynamic-size
这个参数之前讲过。当开启之后,上述阈值的乘法因子会变成除法因子,能够动态调整每层的数据量阈值,使得较多的数据可以落在最高一层,能够减少空间放大,整个 LSM Tree 的结构也会更稳定。对于机械硬盘的环境,强烈建议开启。
Generic Parameters
max_open_files | state.backend.rocksdb.files.open
顾名思义,是 RocksDB 实例能够打开的最大文件数,默认为-1,表示不限制。由于sstable的索引和布隆过滤器默认都会驻留内存,并占用文件描述符,所以如果此值太小,索引和布隆过滤器无法正常加载,就会严重拖累读取性能。
max_background_compactions/max_background_flushes | state.backend.rocksdb.thread.num
后台负责 flush 和 compaction 的最大并发线程数,默认为1。注意 Flink 将这两个参数合二为一处理(对应 DBOptions.setIncreaseParallelism() 方法),鉴于 flush 和 compaction 都是相对重的操作,如果 CPU 余量比较充足,建议调大,在我们的实践中一般设为4。
state.backend.rocksdb.thread.num
后台 flush 和 compaction 的线程数. 默认值 ‘1‘. 建议调大
state.backend.rocksdb.writebuffer.count
每个 column family 的 write buffer 数目,默认值 ‘2‘. 如果有需要可以适当调大
state.backend.rocksdb.writebuffer.size
每个 write buffer 的 size,默认值‘64MB‘. 对于写频繁的场景,建议调大
state.backend.rocksdb.block.cache-size
每个 column family 的 block cache大小,默认值‘8MB’,如果存在重复读的场景,建议调大
state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法,具体代码片段如下:
管理 RocksDB 内存的 3 种配置
1.block_cache_size 的配置
此配置最终将控制内存中缓存的最大未压缩块数。随着块数的不断增加,内存大小也会增加。因此,通过预先配置,您可以保持固定的内存消耗水平。
2.write_buffer_size 的配置
这种配置控制着 RocksDB 中 MemTable 的最大值。活跃 MemTables 和只读的 MemTables 最终会影响 RocksDB 中的内存大小,所以提前调整可能会在以后为您避免一些麻烦。
3.max_write_buffer_number 的配置
在 RocksDB 将 MemTables 导出到磁盘上的 SSTable 之前,此配置决定并控制着内存中保留的 MemTables 的最大数量。这实际上是内存中“只读内存表“的最大数量。
https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA -- 调参
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
rocksDB 读取机制
RocksDB 中的读取操作首先访问活动内存表(Active Memory Table)来反馈查询。如果找到待查询的 key,则读取操作将由新到旧依次访问,直到找到待查询的 key 为止。如果在任何 MemTable 中都找不到目标 key,那么 READ 操作将访问 SSTables,再次从最新的开始。
SSTables 文件可以:
优先去 RocksDB 的 BlockCache 读取;
如果 BlockCache 没有的话,就去读操作系统的文件,这些文件块又可能被操作系统缓存了;
最差的情况就是去本地磁盘读取;
SST 级别的 bloom filter 策略可以避免大量的磁盘访问。
JVM Overhead 配置大小不够。这个默认大小是 TM 大小的 10%,但是不会超过 1G。你的情况是 TM
的总内存比较大,可以尝试调大一点。相关配置项:taskmanager.memory.jvm-overhead.[min|max|fraction]
- UDF 中使用了 native memory,可能是用户代码,也可能是依赖的第三方库。这种属于 task off-heap 内存,默认大小是
0,相关配置项:taskmanager.memory.task.off-heap.size - 如果使用了 RocksDBStateBackend,也有可能 RocksDB 的内存超用。Flink 会设置 RocksDB
使用的缓存大小为 managed memory 大小,但是我们发现 RocksDB 存在缺陷,在极个别情况下有可能会限制不住。可以尝试关闭
RocksDB 的内存控制,这样 RocksDB 会使用默认缓存大小,不会随着 Flink TM
的增大而增大。配置项:state.backend.rocksdb.memory.managed