简析Flink状态生存时间(State TTL)机制的底层实现
前言
很久没写过源码走读类型的文章了。最近在做业务需求时用Flink的State TTL非常多,今天就来探索一下吧。
从Flink 1.6版本开始,社区为状态引入了TTL(time-to-live,生存时间)机制,支持Keyed State的自动过期,有效解决了状态数据在无干预情况下无限增长导致OOM的问题。State TTL的用法很简单,官方文档中给出的示例代码如下。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
那么State TTL的背后又隐藏着什么样的思路呢?下面就从设置类StateTtlConfig入手开始研究(Flink代码版本为1.9.3)。
StateTtlConfig
该类中有5个成员属性,它们就是用户需要指定的全部参数了。
private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
其中,ttl参数表示用户设定的状态生存时间。而UpdateType、StateVisibility和TtlTimeCharacteristic都是枚举,分别代表状态时间戳的更新方式、过期状态数据的可见性,以及对应的时间特征。它们的含义在注释中已经解释得很清楚了。
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum UpdateType {
/** TTL is disabled. State does not expire. */
Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
OnReadAndWrite
}
/**
* This option configures whether expired user value can be returned or not.
*/
public enum StateVisibility {
/** Return expired user value if it is not cleaned up yet. */
ReturnExpiredIfNotCleanedUp,
/** Never return expired user value. */
NeverReturnExpired
}
/**
* This option configures time scale to use for ttl.
*/
public enum TtlTimeCharacteristic {
/** Processing time, see also <code>org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>. */
ProcessingTime
}
Flink目前仅支持基于处理时间的State TTL,事件时间会在不久的将来支持。
CleanupStrategies内部类则用来规定过期状态的特殊清理策略,用户在构造StateTtlConfig时,可以通过调用以下方法之一指定。
-
cleanupFullSnapshot()
当对状态做全量快照时清理过期数据,对开启了增量检查点(incremental checkpoint)的RocksDB状态后端无效,对应源码中的EmptyCleanupStrategy。
为什么叫做“空的”清理策略呢?因为该选项只能保证状态持久化时不包含过期数据,但TaskManager本地的过期状态则不作任何处理,所以无法从根本上解决OOM的问题,需要定期重启作业。 -
cleanupIncrementally(int cleanupSize, boolean runCleanupForEveryRecord)
增量清理过期数据,默认在每次访问状态时进行清理,将runCleanupForEveryRecord设为true可以附加在每次写入/删除时清理。cleanupSize指定每次触发清理时检查的状态条数。
仅对基于堆的状态后端有效,对应源码中的IncrementalCleanupStrategy。 -
cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
当RocksDB做compaction操作时,通过Flink定制的过滤器(FlinkCompactionFilter)过滤掉过期状态数据。参数queryTimeAfterNumEntries用于指定在写入多少条状态数据后,通过状态时间戳来判断是否过期。
该策略仅对RocksDB状态后端有效,对应源码中的RocksdbCompactFilterCleanupStrategy。CompactionFilter是RocksDB原生提供的机制,其说明可见这里。
如果不调用上述方法,则采用默认的后台清理策略,下文有讲。
TtlStateFactory、TtlStateContext
在所有Keyed State状态后端的抽象基类AbstractKeyedStateBackend中,创建并记录一个状态实例的方法如下。
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
可见是调用了TtlStateFactory.createStateAndWrapWithTtlIfEnabled()方法来真正创建。顾名思义,TtlStateFactory是产生TTL状态的工厂类。
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateBackend<K> stateBackend,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(stateBackend);
Preconditions.checkNotNull(timeProvider);
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
namespaceSerializer, stateDesc, stateBackend, timeProvider)
.createState() :
stateBackend.createInternalState(namespaceSerializer, stateDesc);
}
由上可知,如果我们为状态描述符StateDescriptor加入了TTL,那么就会调用TtlStateFactory.createState()方法创建一个带有TTL的状态实例;否则,就调用StateBackend.createInternalState()创建一个普通的状态实例。TtlStateFactory.createState()的代码如下。
@SuppressWarnings("unchecked")
private IS createState() throws Exception {
SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.get();
if (incrementalCleanup != null) {
incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
}
return state;
}
其中,stateFactories是一个Map结构,维护了各种状态描述符与对应产生该种状态对象的工厂方法映射。所有的工厂方法都被包装成了Supplier(Java 8提供的函数式接口),所以在上述createState()方法中,可以通过Supplier.get()方法来实际执行createTtl*State()工厂方法,并获得新的状态实例。
this.stateFactories = createStateFactories();
@SuppressWarnings("deprecation")
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
return Stream.of(
Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}
@SuppressWarnings("unchecked")
private <T> IS createListState() throws Exception {
ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer()));
return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
}
// 以下略去...
可见,带有TTL的状态类名其实就是普通状态类名加上Ttl前缀,只是没有公开给用户而已。并且在生成Ttl*State时,还会通过createTtlStateContext()方法生成TTL状态的上下文。
@SuppressWarnings("unchecked")
private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {
ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter config
OIS originalState = (OIS) stateBackend.createInternalState(
namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
return new TtlStateContext<>(
originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
}
TtlStateContext的本质是对以下几个实例做了封装。
- 原始State(通过StateBackend.createInternalState()方法创建)及其序列化器(通过StateDescriptor.getSerializer()方法取得);
- StateTtlConfig,前文已经讲过;
- TtlTimeProvider,用来提供判断状态过期标准的时间戳。当前只是简单地代理了System.currentTimeMillis(),没有任何其他代码;
- 一个Runnable类型的回调方法,通过registerTtlIncrementalCleanupCallback()方法产生,用于状态数据的增量清理,后面会看到它的用途。
接下来就具体看看TTL状态是如何实现的。
AbstractTtlState、AbstractTtlDecorator
在解说之前,先放一幅类图。
所有Ttl*State都是AbstractTtlState的子类,而AbstractTtlState又是装饰器AbstractTtlDecorator的子类。AbstractTtlDecorator提供了最基本的TTL逻辑,代码不长,全部抄录如下。
abstract class AbstractTtlDecorator<T> {
/** Wrapped original state handler. */
final T original;
final StateTtlConfig config;
final TtlTimeProvider timeProvider;
/** Whether to renew expiration timestamp on state read access. */
final boolean updateTsOnRead;
/** Whether to renew expiration timestamp on state read access. */
final boolean returnExpired;
/** State value time to live in milliseconds. */
final long ttl;
AbstractTtlDecorator(
T original,
StateTtlConfig config,
TtlTimeProvider timeProvider) {
// ......
}
<V> V getUnexpired(TtlValue<V> ttlValue) {
return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
}
<V> boolean expired(TtlValue<V> ttlValue) {
return TtlUtils.expired(ttlValue, ttl, timeProvider);
}
<V> TtlValue<V> wrapWithTs(V value) {
return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
}
<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
return wrapWithTs(ttlValue.getUserValue());
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
return ttlValue == null ? null : ttlValue.getUserValue();
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
}
它的成员属性比较容易理解,例如,updateTsOnRead表示在读取状态值时也更新时间戳(即UpdateType.OnReadAndWrite),returnExpired表示即使状态过期,在被真正删除之前也返回它的值(即StateVisibility.ReturnExpiredIfNotCleanedUp)。
状态值与TTL的包装(成为TtlValue)以及过期检测都由工具类TtlUtils来负责,思路很简单,代码如下。
public class TtlUtils {
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
return expired(ttlValue, ttl, timeProvider.currentTimestamp());
}
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}
static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
return expired(ts, ttl, timeProvider.currentTimestamp());
}
public static boolean expired(long ts, long ttl, long currentTimestamp) {
return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
}
private static long getExpirationTimestamp(long ts, long ttl) {
long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
return ts + ttlWithoutOverflow;
}
static <V> TtlValue<V> wrapWithTs(V value, long ts) {
return new TtlValue<>(value, ts);
}
}
TtlValue的属性只有两个:状态值和时间戳,代码略去。
AbstractTtlDecorator核心方法是获取状态值的getWrappedWithTtlCheckAndUpdate(),它接受三个参数:
- getter:一个可抛出异常的Supplier,用于获取状态值;
- updater:一个可抛出异常的Consumer,用于更新状态的时间戳;
- stateClear:一个可抛出异常的Runnable,用于异步删除过期状态。
可见,在默认情况下的后台清理策略是:只有状态值被读取时,才会做过期检测,并异步清除过期的状态。这种惰性清理的机制会导致那些实际已经过期但从未被再次访问过的状态无法被删除,需要特别注意。官方文档中也已有提示:
By default, expired values are explicitly removed on read, such as ValueState#value, and periodically garbage collected in the background if supported by the configured state backend.
当确认到状态过期时,会调用stateClear的逻辑进行删除;如果需要在读取时顺便更新状态的时间戳,会调用updater的逻辑重新包装一个TtlValue。
AbstractTtlState的代码更加简单,主要的方法列举如下。
final Runnable accessCallback;
<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<T>, SE> getter,
ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
return getWithTtlCheckAndUpdate(getter, updater, original::clear);
}
@Override
public void clear() {
original.clear();
accessCallback.run();
}
其中,accessCallback就是TtlStateContext中注册的增量清理回调。
下面以TtlMapState为例,看看具体的TTL状态如何利用上文所述的这些实现。
TtlMapState
以下是部分代码。
class TtlMapState<K, N, UK, UV>
extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
implements InternalMapState<K, N, UK, UV> {
TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
super(ttlStateContext);
}
@Override
public UV get(UK key) throws Exception {
TtlValue<UV> ttlValue = getWrapped(key);
return ttlValue == null ? null : ttlValue.getUserValue();
}
private TtlValue<UV> getWrapped(UK key) throws Exception {
accessCallback.run();
return getWrappedWithTtlCheckAndUpdate(
() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}
@Override
public void put(UK key, UV value) throws Exception {
accessCallback.run();
original.put(key, wrapWithTs(value));
}
@Override
public void putAll(Map<UK, UV> map) throws Exception {
accessCallback.run();
if (map == null) {
return;
}
Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
long currentTimestamp = timeProvider.currentTimestamp();
for (Map.Entry<UK, UV> entry : map.entrySet()) {
UK key = entry.getKey();
ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp));
}
original.putAll(ttlMap);
}
@Override
public void remove(UK key) throws Exception {
accessCallback.run();
original.remove(key);
}
@Override
public boolean contains(UK key) throws Exception {
TtlValue<UV> ttlValue = getWrapped(key);
return ttlValue != null;
}
// ......
}
可见,TtlMapState的增删改查操作都是在原MapState上进行,只是加上了TTL相关的逻辑,这也是装饰器模式的特点。例如,TtlMapState.get()方法调用了上述AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate()方法,传入的获取(getter)、插入(updater)和删除(stateClear)的逻辑就是原MapState的get()、put()和remove()方法。而TtlMapState.put()只是在调用原MapState的put()方法之前,将状态包装为TtlValue而已。
增量清理策略
另外需要注意,所有增删改查操作之前都需要执行accessCallback.run()方法。如果启用了增量清理策略,该Runnable会通过在状态数据上维护一个全局迭代器向前清理过期数据。如果未启用增量清理策略,accessCallback为空。前文提到过的TtlStateFactory.registerTtlIncrementalCleanupCallback()方法如下。
private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
StateTtlConfig.IncrementalCleanupStrategy config =
ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
boolean cleanupConfigured = config != null && incrementalCleanup != null;
boolean isCleanupActive = cleanupConfigured &&
isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };
if (isCleanupActive && config.runCleanupForEveryRecord()) {
stateBackend.registerKeySelectionListener(stub -> callback.run());
}
return callback;
}
实际清理的代码则位于TtlIncrementalCleanup类中,stateIterator就是状态数据的迭代器。
void stateAccessed() {
initIteratorIfNot();
try {
runCleanup();
} catch (Throwable t) {
throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t
}
}
private void initIteratorIfNot() {
if (stateIterator == null || !stateIterator.hasNext()) {
stateIterator = ttlState.original.getStateIncrementalVisitor(cleanupSize);
}
}
private void runCleanup() {
int entryNum = 0;
Collection<StateEntry<K, N, S>> nextEntries;
while (
entryNum < cleanupSize &&
stateIterator.hasNext() &&
!(nextEntries = stateIterator.nextEntries()).isEmpty()) {
for (StateEntry<K, N, S> state : nextEntries) {
S cleanState = ttlState.getUnexpiredOrNull(state.getState());
if (cleanState == null) {
stateIterator.remove(state);
} else if (cleanState != state.getState()) {
stateIterator.update(state, cleanState);
}
}
entryNum += nextEntries.size();
}
}
RocksDB压缩过滤清理策略
如果启用了该策略,Flink会通过维护一个RocksDbTtlCompactFiltersManager实例来管理FlinkCompactionFilter过滤器。FlinkCompactionFilter并不是在Flink工程中维护的,而是位于Data Artisans为Flink专门维护的FRocksDB库内。FLINK-10471实现了FlinkCompactionFilter及其附属逻辑,主要为C++代码,通过JNI调用。对应的commit详见GitHub,这里就不班门弄斧了。关于RocksDB的compaction相关细节,笔者之前也写过一篇长文做了些分析。
The End
写的有些乱了,就酱吧。
民那晚安。