写给大忙人看的 Flink Operator State 的存储
<h2 id="1">1.前言</h2>
最近有幸在做面试官,在面试的过程中发现很多面试者都知道 Key State 会存入 RockDB (如果设置 StateBackend 为 RockDBStateBackend ),却也同样认为 Operator State 也会存入 RockDB。其中包括一些看过这部分源码的或者已经在发布一些课程的人。
<h2 id="2">2. 详解</h2>
<h3 id="2.1">2.1 结论</h3>
Operator State 与 我们设置的 StateBackend 无关,并且 Operator State 也不会存入到 RockDB 中,而是会首先保存在内存中,而后在进行 checkpoint 的时候通过 stream 的方式直接写入文件( 可以是本地文件也可以是 hdfs 文件 )
<h3 id="2.2">2.2 跟opertor state 相关的类</h3>
在 flink-runtime 包下
image
<h3 id="2.3">2.3 以 FlinkKafkaConsumerBase 为例具体说明</h3>
首先来看一下 AbstractStateBackend
@Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws IOException;
@Override
public abstract OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception;
有两个方法,一个是 createKeyedStateBackend 返回 AbstractKeyedStateBackend,它有两个子类
在这里插入图片描述
而 createOperatorStateBackend 返回 OperatorStateBackend,它仅有一个子类
在这里插入图片描述
当 FlinkKafkaConsumerBase 从 checkpoint 中恢复时, 我们通过 createOperatorStateBackend 的 createOperatorStateBackend 方法可以知道它会首先执行
DefaultOperatorStateBackendBuilder 的 bulid 方法
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(
userClassloader,
asynchronousSnapshots,
registeredOperatorStates,
registeredBroadcastStates,
cancelStreamRegistryForBackend);
OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
cancelStreamRegistry,
userClassloader,
registeredOperatorStates,
registeredBroadcastStates,
restoreStateHandles
);
try {
// OperatorState restore
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("Failed when trying to restore operator state backend", e);
}
// 当执行完 restoreOperation.restore(); 之后将得到的 registeredOperatorStates 传入 DefaultOperatorStateBackend
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistryForBackend,
registeredOperatorStates,
registeredBroadcastStates,
new HashMap<>(),
new HashMap<>(),
snapshotStrategy
);
}
我们可以具体来看一下 restore 的时候做了什么
@Override
public Void restore() throws Exception {
if (stateHandles.isEmpty()) {
return null;
}
for (OperatorStateHandle stateHandle : stateHandles) {
if (stateHandle == null) {
continue;
}
//读取 file stream
FSDataInputStream in = stateHandle.openInputStream();
closeStreamOnCancelRegistry.registerCloseable(in);
ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userClassloader);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(userClassloader);
backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
// Recreate all PartitionableListStates from the meta info
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
// must fail now if the previous typeSerializer cannot be restored because there is no typeSerializer
// capable of reading previous state
// TODO when eager state registration is in place, we can try to get a convert deserializer
// TODO from the newly registered typeSerializer instead of simply failing here
throw new IOException("Unable to restore operator state [" + restoredSnapshot.getName() + "]." +
" The previous typeSerializer of the operator state must be present; the typeSerializer could" +
" have been removed from the classpath, or its implementation have changed and could" +
" not be loaded. This is a temporary restriction that will be fixed in future versions.");
}
PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName());
if (null == listState) {
listState = new PartitionableListState<>(restoredMetaInfo);
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
} else {
// TODO with eager state registration in place, check here for typeSerializer migration strategies
}
}
// ... and then get back the broadcast state.
List<StateMetaInfoSnapshot> restoredBroadcastMetaInfoSnapshots =
backendSerializationProxy.getBroadcastStateMetaInfoSnapshots();
for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) {
// must fail now if the previous typeSerializer cannot be restored because there is no typeSerializer
// capable of reading previous state
// TODO when eager state registration is in place, we can try to get a convert deserializer
// TODO from the newly registered typeSerializer instead of simply failing here
throw new IOException("Unable to restore broadcast state [" + restoredSnapshot.getName() + "]." +
" The previous key and value serializers of the state must be present; the serializers could" +
" have been removed from the classpath, or their implementations have changed and could" +
" not be loaded. This is a temporary restriction that will be fixed in future versions.");
}
BackendWritableBroadcastState<?, ?> broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName());
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(restoredMetaInfo);
registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), broadcastState);
} else {
// TODO with eager state registration in place, check here for typeSerializer migration strategies
}
}
// Restore all the states
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
final String stateName = nameToOffsets.getKey();
PartitionableListState<?> listStateForName = registeredOperatorStates.get(stateName);
if (listStateForName == null) {
BackendWritableBroadcastState<?, ?> broadcastStateForName = registeredBroadcastStates.get(stateName);
Preconditions.checkState(broadcastStateForName != null, "Found state without " +
"corresponding meta info: " + stateName);
deserializeBroadcastStateValues(broadcastStateForName, in, nameToOffsets.getValue());
} else {
deserializeOperatorStateValues(listStateForName, in, nameToOffsets.getValue());
}
}
} finally {
Thread.currentThread().setContextClassLoader(restoreClassLoader);
if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
IOUtils.closeQuietly(in);
}
}
}
return null;
}
注释已经写的很明白了,简单来说就是通过流把文件读入内存中然后进行一系列必要的操作得到 MetaInfo。并将得到的 registeredOperatorStates 传递给 DefaultOperatorStateBackend。
当 initializeState 时,
public final void initializeState(FunctionInitializationContext context) throws Exception {
.....
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
.....
}
首先呢,从 OperateStateStore 中获取 UnionListState。而 OperateStateStore 只有一个实现类 DefaultOperatorStateBackend。通过源码我们可以知道,直接从 registeredOperatorStates 获取该 state ,最终返回的是一个 PartitionableListState。
而当进行 snapshotState 的时候,
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
...
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
...
}
实际上 unionOffsetStates.add 是调用了 PartitionableListState 的 add 方法。
@Override
public void add(S value) {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
internalList.add(value);
}
而其中 internalList 实际上就是一个 ArrayList。
然后会调用 DefaultOperatorStateBackendSnapshotStrategy 中的 snapshot 进行 checkpoint
...
PartitionableListState<?> value = entry.getValue();
//operator state 写入 hdfs
long[] partitionOffsets = value.write(localOut);
...
将 PartitionableListState 写入文件,实际上有调用了 PartitionableListState 自己的 write 方法
public long[] write(FSDataOutputStream out) throws IOException {
long[] partitionOffsets = new long[internalList.size()];
DataOutputView dov = new DataOutputViewStreamWrapper(out);
for (int i = 0; i < internalList.size(); ++i) {
S element = internalList.get(i);
partitionOffsets[i] = out.getPos();
getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
}
return partitionOffsets;
}