Flink 源码:TM 端恢复及创建 OperatorState
本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/6Oi_1tP-7Jns3ZguMW7wLg
在之前《StreamTask 初始化流程》的文章中,省略掉了 TM 端恢复 State 的详细过程,本文主要讲述:
- OperatorState 的恢复和创建流程
- Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来
一、 TM 端恢复 OperatorState 的流程
StateBackend 创建 OperatorStateBackend 时 TM 端会恢复 OperatorState。目前 Flink 支持的三种 StateBackend 都对应同一种 OperatorStateBackend,即:DefaultOperatorStateBackend,具体 new DefaultOperatorStateBackend 的过程由建造器 DefaultOperatorStateBackendBuilder 完成。
三种 StateBackend 的 createOperatorStateBackend 方法非常相似,源码如下:
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
所有的初始化流程都在 DefaultOperatorStateBackendBuilder 类的 build 方法中,build 方法源码如下所示:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(XXX);
OperatorStateRestoreOperation restoreOperation =
new OperatorStateRestoreOperation(XXX);
try {
// OperatorState 恢复流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
return new DefaultOperatorStateBackend(XXX);
}
build 方法中除了构造了几个对象以外,重点执行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢复流程。
先介绍 OperatorStateRestoreOperation 类中两个重要的 Map:
- registeredOperatorStates 用于保存 StateName 和 ListState 的映射关系;
- registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射关系
restore 源码如下所示:
// OperatorStateRestoreOperation 类中两个重要的 Map
// 保存 StateName 和 ListState 的映射关系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
// 保存 StateName 和 BroadcastState 的映射关系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
// stateHandles 为空,表示没有要恢复的 State
if (stateHandles.isEmpty()) {
return null;
}
// 遍历所有 stateHandles
for (OperatorStateHandle stateHandle : stateHandles) {
// 通过 stateHandle 可以获取 InputStream 读取数据
FSDataInputStream in = stateHandle.openInputStream();
try {
List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
// 从元数据中创建 PartitionableListStates,并没有恢复真正的 State
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// registeredOperatorStates 中维护的 StateName 与 ListState 的映射关系
PartitionableListState<?> listState = registeredOperatorStates
.get(restoredSnapshot.getName());
// listState == null 表示当前 State 还未创建,则创建,并保存到 map 中
if (null == listState) {
// 这里只是依赖 MetaInfo 创建了 PartitionableListState,并没有恢复真正的 State 数据
listState = new PartitionableListState<>(restoredMetaInfo);
// 创建出的 State 数据 put 到 registeredOperatorStates 中
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
}
}
// 真正恢复 State 的操作
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
final String stateName = nameToOffsets.getKey();
// 通过 StateName 从 registeredOperatorStates 中获取 ListState
// 因为之前已经根据元数据创建了 State,
// 所以这里 get 不到,只能是因为当前的 StateName 属于 BroadcastState
PartitionableListState<?> listStateForName =
registeredOperatorStates.get(stateName);
// listState 为 null,表示恢复 Broadcast 相关的 State
if (listStateForName == null) {
BackendWritableBroadcastState<?, ?> broadcastStateForName =
registeredBroadcastStates.get(stateName);
deserializeBroadcastStateValues(broadcastStateForName,
in, nameToOffsets.getValue());
} else {
// 恢复 ListState,将恢复出来的元素 add 到 ListState 中
deserializeOperatorStateValues(listStateForName,
in, nameToOffsets.getValue());
}
}
} finally {
Thread.currentThread().setContextClassLoader(restoreClassLoader);
if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
IOUtils.closeQuietly(in);
}
}
}
return null;
}
restore 方法中拿到的就是 JM 分配给当前 subtask 的 stateHandles,如果 stateHandles 为空表示没有要恢复的 State 则直接返回 null,可能是因为任务是直接启动,而不是从 Checkpoint 处恢复。否则 stateHandles 不为空的情况,就遍历一个个 OperatorStateHandle,通过 stateHandle 可以获取 InputStream 读取数据。
首先读出元数据,用于创建 PartitionableListState,并没有真正恢复 State 数据,PartitionableListState 是 OperatorState 对 ListState 的具体实现。ListState 维护在 registeredOperatorStates 这个 Map 中,通过 StateName 从 registeredOperatorStates 中 get,get 不到时,通过元数据创建 State,并存放在 registeredOperatorStates 中。
代码中省略了 BroadcastState 的创建流程,整体流程与 ListState 流程类似,只不过 BroadcastState 维护在 registeredBroadcastStates 中。
最后真正的恢复 State 数据,对于 ListState 而言将恢复出来的元素 add 到 ListState 中。恢复 State 数据的过程其实用反序列化器对状态数据反序列化生成对象的过程。反序列化器维护在 PartitionableListState 的元数据中。
到这里 OperatorState 就恢复完成,此时映射关系已经保存到 OperatorStateRestoreOperation 类的两个 Map 集合中。现在又回到 DefaultOperatorStateBackendBuilder 类的 build 方法中,就会发现其实这两个 Map 是好多地方共享的。这里再贴一下 build 方法的完整源码,重点关注两个 Map:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
// 保存 StateName 和 ListState 的映射关系
Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
// 保存 StateName 和 BroadcastState 的映射关系
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates =
new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
cancelStreamRegistry,
userClassloader,
// 将两个 Map 传递进去,即:restore 过程中,映射关系会存储在这两个 Map 中
registeredOperatorStates,
registeredBroadcastStates,
restoreStateHandles
);
try {
// OperatorState 恢复流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(
userClassloader,
asynchronousSnapshots,
// 将两个 Map 传递给 DefaultOperatorStateBackendSnapshotStrategy
registeredOperatorStates,
registeredBroadcastStates,
cancelStreamRegistryForBackend);
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistryForBackend,
// 再将两个 Map 传递给 DefaultOperatorStateBackend
registeredOperatorStates,
registeredBroadcastStates,
new HashMap<>(),
new HashMap<>(),
snapshotStrategy
);
}
可以看到 build 方法刚开始会 new 两个 Map,然后传递给了 OperatorStateRestoreOperation,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢复流程)实际上将 Checkpoint 中恢复出来的映射关系保存到了这两个 Map 中。之后两个 Map 又传递给了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend。
所以得出结论:DefaultOperatorStateBackend 中持有从 Checkpoint 处恢复出来的 StateName 与具体 State 的映射关系。
到这里 DefaultOperatorStateBackend 就创建完成了,同时留两个问题:
-
上面流程虽然将 OperatorState 从 Checkpoint 中恢复了,但用户在算子中创建的 State 如何与 Checkpoint 中恢复的 OperatorState 关联起来呢?
-
另外对于直接启动,不从 Checkpoint 处恢复的任务,OperatorState 又是如何创建出来的?
带着这两个问题阅读下面流程。
二、 用户定义的 OperatorState 创建流程
Flink 源码中最典型的使用 OperatorState 的场景就是 FlinkConsumer 使用 ListState 去维护 Kafka 的 offset 信息,所以本文就从这块源码入手,看一下这个 ListState 创建流程。
FlinkKafkaConsumerBase 类的 initializeState 方法中用到了 getUnionListState 创建一个 ListState,简洁版源码如下所示:
@Override
public final void initializeState(FunctionInitializationContext context) {
OperatorStateStore stateStore = context.getOperatorStateStore();
unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}
这里调用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是个接口,它只有一个实现类,就是前面创建出来的 DefaultOperatorStateBackend。所以这里会调用 DefaultOperatorStateBackend 类的 getUnionListState 方法。不过 DefaultOperatorStateBackend 中还有一个 getListState(ListStateDescriptor stateDescriptor) 方法,这也就是 OperatorState 类型的 ListState 两种获取方式。可以看一下这两个方法的源码:
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}
@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}
源码中可以看到,无论业务使用的是 getListState 还是 getUnionListState 方法获取 ListState ,最后都会调用同一个方法,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)。加了一个参数 OperatorStateHandle.Mode 用于区分 OperatorState 的模式:
- getListState 对应 SPLIT_DISTRIBUTE 模式
- getUnionListState 对应 UNION 模式
getListState(stateDescriptor, mode) 方法源码如下所示:
// 无论是 getListState 还是 getUnionListState 方法都会调用这里,
// 只不过传递的 Mode 参数不同而已
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
TypeSerializer<S> partitionStateSerializer =
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
PartitionableListState<S> partitionableListState = (PartitionableListState<S>)
registeredOperatorStates.get(name);
// registeredOperatorStates 中维护的是 Checkpoint 中恢复的 StateName 和 ListState 的映射关系
// 如果 partitionableListState == null 表示从 Checkpoint 中没有恢复出这个 State,
// 即:这是一个新的 State,则新建一个 PartitionableListState,并维护在 Map 中
if (null == partitionableListState) {
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
} else {
// State 已经从 Checkpoint 中恢复了,检查兼容性问题
// 这里会检查 StateName 和 AssignmentMode 是否可以匹配
checkStateNameAndMode(
partitionableListState.getStateMetaInfo().getName(),
name,
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);
RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
partitionableListState.getStateMetaInfo();
// 检查 序列化是否兼容
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
TypeSerializerSchemaCompatibility<S> stateCompatibility =
restoredPartitionableListStateMetaInfo.
updatePartitionStateSerializer(newPartitionStateSerializer);
// 不兼容,则抛出异常
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("XXX.");
}
partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
}
accessedStatesByName.put(name, partitionableListState);
// 返回 State
return partitionableListState;
}
getListState(stateDescriptor, mode) 方法首先通过 name 从 registeredOperatorStates 中 get 对应的 ListState 保存到 partitionableListState 中,registeredOperatorStates 维护的是 Checkpoint 中恢复的 StateName 和 ListState 的映射关系。所以 partitionableListState == null 表示从 Checkpoint 中没有恢复出这个 State,即:这是一个新的 State,所以新建一个 PartitionableListState,并保存在 registeredOperatorStates 中。
反之,partitionableListState != null 表示 State 已经从 Checkpoint 中恢复了,开始检查兼容性,首先会检查 Checkpoint 中恢复的 State 和用户新申请的 StateName 和 AssignmentMode 是否可以匹配。
- StateName 和 name 肯定是匹配的,因为 partitionableListState 是根据 name get 出来的。
- AssignmentMode 枚举用于区分应用层使用的 getListState 恢复还是 getUnionListState 恢复,getListState 表示 SPLIT_DISTRIBUTE 模式,getUnionListState 表示 UNION 模式。如果 State 中存储的是 SPLIT_DISTRIBUTE 模式,但任务恢复时,代码改成了 getUnionListState,实际上 State 不能正常恢复的。
StateName 和 AssignmentMode 检查完毕后,会检查序列化是否兼容,不兼容,则抛出异常。兼容则会返回 State。
上述流程就回答了最开始提的两个问题:
-
OperatorState 从 Checkpoint 中恢复后,用户在算子中创建的 State 如何与 Checkpoint 中恢复的 OperatorState 关联起来呢?
答:依赖 registeredOperatorStates 这个 Map 维护了 StateName 和 ListState 的映射关系,用户创建 State 是通过 StateName 从 registeredOperatorStates 中查找,如果能找到,对其进行兼容性检查,检查通过就会返回从 Checkpoint 中恢复的 ListState,从而完成了关联。
-
对于直接启动,不从 Checkpoint 处恢复的任务,OperatorState 又是如何创建出来的?
答:对于直接启动的任务,registeredOperatorStates 肯定是空的。创建 State 时,从 registeredOperatorStates 中 get 不到,所以就创建一个新的 PartitionableListState,并保存在 registeredOperatorStates 中。
到这里,OperatorState 就完成了恢复,且用户的 State 也正常的创建出来了。
三、总结
文中首先介绍了 OperatorState 的恢复和创建流程,并介绍从 Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来的。后续将会详细介绍 KeyedState 的恢复创建流程以及如何将 Checkpoint 处恢复的 State 如何与代码中创建的 State 关联起来。