关于使用Flink RocksDB状态后端时一定要写MapSta

2023-10-29  本文已影响0人  LittleMagic

前言

抱歉起这种烂大街的日本轻小说风格标题来吸引注意力。原本我认为这是常识,不需要专门写一篇文章来讲解如此细碎的点。但是在最近工作巡检中发现了越来越多如同ValueState<Map>的状态用法(当然大部分是历史遗留),部分Flink作业深受性能问题困扰,所以还是抽出点时间快速聊一聊,顺便给出不算优雅但还算有效的挽救方案。

基于RocksDB的状态序列化

我们已经知道,RocksDB是基于二进制流的内嵌K-V存储,所以Flink任务使用RocksDB状态后端时,写/读操作的状态数据都需要经过序列化和反序列化,从而利用TaskManager本地磁盘实现海量的状态存储。

举个栗子,RocksDBValueState的取值和更新方法如下:

class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
       implements InternalValueState<K, N, V> {
   @Override
   public TypeSerializer<K> getKeySerializer() {
       return backend.getKeySerializer();
   }

   @Override
   public TypeSerializer<N> getNamespaceSerializer() {
       return namespaceSerializer;
   }

   @Override
   public TypeSerializer<V> getValueSerializer() {
       return valueSerializer;
   }

   @Override
   public V value() {
       try {
           byte[] valueBytes =
                   backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());

           if (valueBytes == null) {
               return getDefaultValue();
           }
           dataInputView.setBuffer(valueBytes);
           return valueSerializer.deserialize(dataInputView);
       } catch (IOException | RocksDBException e) {
           throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
       }
   }

   @Override
   public void update(V value) {
       if (value == null) {
           clear();
           return;
       }

       try {
           backend.db.put(
                   columnFamily,
                   writeOptions,
                   serializeCurrentKeyWithGroupAndNamespace(),
                   serializeValue(value));
       } catch (Exception e) {
           throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
       }
   }
}

可见key和value都需要经过对应类型的TypeSerializer的处理,即如果将状态声明为ValueState<Map<K, V>>,那么将由MapSerializer<K, V>负责值的正反序列化。特别注意,serializeCurrentKeyWithGroupAndNamespace()方法中,key需要加上它所对应的KeyGroup编号和对应的Namespace(Namespace是窗口信息),形成一个复合key,即:CompositeKey(KG, K, NS),RocksDB实际存储的状态数据的key都类似如此。具体可参看SerializedCompositeKeyBuilder类,不再赘述。

接下来再看一下RocksDBMapState的部分实现。

class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
        implements InternalMapState<K, N, UK, UV> {
    @Override
    public TypeSerializer<K> getKeySerializer() {
        return backend.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return namespaceSerializer;
    }

    @Override
    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

        return (rawValueBytes == null
                ? null
                : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
    }

    @Override
    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);

        backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
    }

    @Override
    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }

        try (RocksDBWriteBatchWrapper writeBatchWrapper =
                new RocksDBWriteBatchWrapper(
                        backend.db, writeOptions, backend.getWriteBatchSize())) {
            for (Map.Entry<UK, UV> entry : map.entrySet()) {
                byte[] rawKeyBytes =
                        serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
                                entry.getKey(), userKeySerializer);
                byte[] rawValueBytes =
                        serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    @Override
    public Iterable<Map.Entry<UK, UV>> entries() {
        return this::iterator;
    }

    @Override
    public Iterator<Map.Entry<UK, UV>> iterator() {
        final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

        return new RocksDBMapIterator<Map.Entry<UK, UV>>(
                backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
            @Override
            public Map.Entry<UK, UV> next() {
                return nextEntry();
            }
        };
    }

由于MapState的本身有用户定义的key UK,所以RocksDB存储它时,会在上文所述的复合key后面,再加上UK的值,即:CompositeKey(KG, K, NS) :: UK。这样,同属于一个KeyContext的所有用户键值对就存在一个连续的存储空间内,可以通过RocksDB WriteBatch机制攒批,实现批量写(putAll()方法),也可以通过RocksDB Iterator机制做前缀扫描,实现批量读(entries()方法)。

问题的症结

代码读完了。假设我们在某个key下有5条数据的状态,若使用ValueState<Map<String, String>>来存储,按照MapSerializer的序列化方式,其存储可以记为:

(1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]

注意对于无窗口上下文的状态,NS为VoidNamespace。且序列化Map时,会加上Map的大小,以及表示每个value是否为NULL的标记。

如果使用MapState<String, String>存储,可以记为:

(1, k, VoidNamespace) :: k1 -> v1
(1, k, VoidNamespace) :: k2 -> v2
(1, k, VoidNamespace) :: k3 -> NULL
(1, k, VoidNamespace) :: k4 -> v4
(1, k, VoidNamespace) :: k5 -> v5

如果我们获取或修改一条状态数据,前者需要将所有数据做一遍序列化和反序列化,而后者只需要处理一条。在Map比较小的情况下可能没有明显的性能差异,但是如果Map有几十个甚至上百个键值对,或者某些value的长度很长(如各类打标标记串等),ValueState<Map>的性能退化就会非常严重,造成反压。

有的同学可能会问:我对状态数据的操作基本都是“整存整取”(即读/写整个Map),也不建议使用ValueState<Map>吗?答案仍然是不建议。除了前面提到的WriteBatch和Iterator为MapState带来的优化之外,RocksDB更可以利用多线程进行读写,而单个大value不仅不能享受这个便利,还会挤占Block Cache空间,在出现数据倾斜等场景时,磁盘I/O可能会打到瓶颈。所以,我们在开始编写作业时就应该正确使用MapState

平滑迁移

为了消除此类状态误用的影响,常见的重构方式是将ValueState<Map>修改为MapState,重置位点后消费历史数据,积攒状态,并替换掉旧任务。但是对于状态TTL较长、size较大的场景(例如物流监控场景经常有30天TTL、十几TB大的State),这样显然非常不方便,下面提供一种简单的平滑迁移方式。

假设原本误用的状态为mainState,我们声明两个新的状态,一个是新的MapState newMainState,一个是布尔型ValueState isMigratedState,表示该key对应的状态是否已经迁移成了新的,即:

    private transient ValueState<Map<String, String>> mainState;

    private transient ValueState<Boolean> isMigratedState;
    private transient MapState<String, String> newMainState;    

当然,它们的TTL等参数要完全相同。

写两个新的方法,负责在读写mainState时将其迁移成newMainState,并做上相应的标记。不存在历史状态的,直接以新格式存储。再强调一遍,newMainState.entries()newMainState.putAll()的性能很不错,不必过于担心。

    private Map<String, String> wrapGetMainState() throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        Map<String, String> result = new HashMap<>();
        for (Entry<String, String> e : newMainState.entries()) {
            result.put(e.getKey(), e.getValue());
        }
        return result;
    }

    private void wrapUpdateMainState(Map<String, String> data) throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        newMainState.putAll(data);
    }

再将历史代码中的状态访问全部替换成wrapGetMainState()wrapUpdateMainState()方法的调用即可。表面上看是由一个状态句柄变成了两个状态句柄,但是标记状态的访问十分轻量级,且随着程序的运行,旧状态的数据渐进式地替换完毕之后,就可以安全地删除mainStateisMigratedState了。当然,托管内存的设置要科学,并添加一些有利于RocksDB状态吞吐量的参数,如:

state.backend.rocksdb.predefined-options  SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.memory.partitioned-index-filters  true

基于堆的状态呢?

与RocksDB相反,基于堆的JobManager和FileSystem状态后端无需序列化和反序列化,当然状态的大小就要受制于TaskManager内存。不过,如果我们采用这两种状态后端,ValueState<Map>MapState也就没有明显的性能差别了,因为HeapValueStateHeapMapState的底层都是相同的,即CopyOnWriteStateTable,本质上是内存中的状态映射表。读者有兴趣可以自行参考对应的Flink源码,这里不再啰嗦了。

ValueState、ListState、MapState三者在RocksDB状态后端和基于堆的状态后端中的异同点可以概括成下表。

The End

上一篇下一篇

猜你喜欢

热点阅读