Flink 源码:JM 端从 Checkpoint 恢复流程
本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/hJsyzQo4lreEpgSAiLRt2g
本文主要讲述:
- JM 端恢复 Checkpoint 元数据流程
- JM 端为 subtask 分配 StateHandle 流程
一、 JM 端恢复 Checkpoint 元数据流程
JM 端从 Checkpoint 中恢复任务的流程是从 CheckpointCoordinator 的 restoreSavepoint 方法开始的。restoreSavepoint 方法的源码如下所示:
public boolean restoreSavepoint(
String savepointPointer,
boolean allowNonRestored,
Map<JobVertexID, ExecutionJobVertex> tasks,
ClassLoader userClassLoader) throws Exception {
// 从外部存储获取 Checkpoint 元数据,这里的 checkpointStorage 是 FsCheckpointStorage
final CompletedCheckpointStorageLocation checkpointLocation =
checkpointStorage.resolveCheckpoint(savepointPointer);
// 加载 Checkpoint 元数据,并对 Checkpoint 进行校验,
// 校验项包括 maxParallelism、allowNonRestoredState
CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
// 将要恢复的 Checkpoint 信息写入到 zk 中
completedCheckpointStore.addCheckpoint(savepoint);
// Reset the checkpoint ID counter
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
// 从最近一次 Checkpoint 处恢复 State
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
首先从 savepointPointer 所在的目录找到 _metadata 文件(Checkpoint 的元数据文件),然后生成 CompletedCheckpointStorageLocation。CompletedCheckpointStorageLocation 见名之意:已经完成的 Checkpoint 存储位置。
加载元数据及校验逻辑
然后 Checkpoints.loadAndValidateCheckpoint() 方法加载 Checkpoint 元数据,并对 Checkpoint 进行校验,校验项包括 maxParallelism、allowNonRestoredState。Checkpoints 类 loadAndValidateCheckpoint() 方法的精简版校验源码如下所示:
public static CompletedCheckpoint loadAndValidateCheckpoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
CompletedCheckpointStorageLocation location,
ClassLoader classLoader,
boolean allowNonRestoredState) throws IOException {
// 从新的 ExecutionGraph 中生成 OperatorId 与 ExecutionJobVertex 的映射
Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
for (ExecutionJobVertex task : tasks.values()) {
for (OperatorID operatorID : task.getOperatorIDs()) {
operatorToJobVertexMapping.put(operatorID, task);
}
}
HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size());
// 循环检查所有的 OperatorState,这里的 OperatorState 不是指 Flink 的 OperatorState,
// 而是指 算子级别的 State,这里的 Operator 指代算子
for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
// 在新的 ExecutionGraph 中找 Checkpoint 中 OperatorId 对应的算子
ExecutionJobVertex executionJobVertex =
operatorToJobVertexMapping.get(operatorState.getOperatorID());
// executionJobVertex == null 说明有 OperatorState ,但找不到对应的 executionJobVertex
if (executionJobVertex != null) {
// 只要新旧 maxParallelism 相同,或者 新的 maxParallelism 没有配置,都认为校验通过
if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
|| !executionJobVertex.isMaxParallelismConfigured()) {
operatorStates.put(operatorState.getOperatorID(), operatorState);
} else {
// 相反 新旧 Job 的 maxParallelism 不同,
// 且新 Job 的 maxParallelism 是用户手动设定的,则抛出异常,恢复失败
throw new IllegalStateException(msg);
}
} else if (allowNonRestoredState) {
// 跳过了恢复流程
LOG.info("Skipping savepoint state for operator {}.");
} else {
// 不允许跳过恢复,检查当前 算子,是否包含状态,
// 如果包含,则抛异常,Job 无法启动
for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
if (operatorSubtaskState.hasState()) {
throw new IllegalStateException(msg);
}
}
}
}
// (3) convert to checkpoint so the system can fall back to it
CheckpointProperties props = CheckpointProperties.forSavepoint();
return new CompletedCheckpoint(XXX);
}
loadAndValidateCheckpoint 方法的检验过程,首先遍历新 Job 的 ExecutionGraph 中,通过 ExecutionGraph 生成 OperatorId 与 ExecutionJobVertex 的映射保存到 operatorToJobVertexMapping 中。然后循环检查所有的 OperatorState,这里的 OperatorState 不是指 Flink 的 OperatorState,而是指算子级别的 State(Operator 指代算子)。
在新的 ExecutionGraph 中找 Checkpoint 中 OperatorId 对应的算子保存到 executionJobVertex 中。executionJobVertex == null 说明有 OperatorState,但在新的 ExecutionGraph 中找不到对应的 executionJobVertex。
executionJobVertex != null 的情况
executionJobVertex != null 说明当前遍历的算子在新的 ExecutionGraph 中可以找到,此时检查新旧任务的 maxParallelism 是否可以匹配。只要新旧 maxParallelism 相同或者新的 maxParallelism 没有配置,都认为校验通过。这里 maxParallelism 没有设置指的是 maxParallelism 可能是 Flink 引擎自动生成的,而不是用户在代码中手动设置的。相反新旧 Job 的 maxParallelism 不同,且新 Job 的 maxParallelism 是用户手动设定的,则抛出异常,恢复失败。这也是在 《从 KeyGroup 到 Rescale》文章中讲到的,maxParallelism 不同,任务不能恢复。
executionJobVertex == null 的情况
executionJobVertex == null 说明有 OperatorState,但在新的 ExecutionGraph 中找不到对应的 executionJobVertex。这种现象说明 Flink 任务的代码发生了变动,删除了一些有状态算子,使得 Checkpoint 中保存的一些 State 不能正常恢复了。
executionJobVertex == null 的情况要对 allowNonRestoredState 参数进行检查,allowNonRestoredState 表示跳过那些无法映射到新程序的状态。如果 allowNonRestoredState 设置为 true,则 Flink 会跳过恢复当前算子的 State;如果 allowNonRestoredState 设置为 false,Flink 会检查当前算子是否有 State,如果有 State 则抛出异常,任务恢复失败。
恢复元数据
校验流程结束后,会将本次 Checkpoint 信息写入 zk,便于从 Checkpoint 中恢复。
restoreLatestCheckpointedState 从 zk 恢复元数据源码如下所示:
// 从 zk 获取所有可以恢复的 Checkpoint 信息,并从最近一次恢复// 从 zk 获取所有可以恢复的 Checkpoint 信息,并从最近一次恢复
completedCheckpointStore.recover();
// Now, we re-register all (shared) states from the checkpoint store with the new registry
for (CompletedCheckpoint completedCheckpoint :
completedCheckpointStore.getAllCheckpoints()) {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(XXX);
// Checkpoint 中恢复出来的 State
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
// 重点:JM 给各个 TM 分配 State
stateAssignmentOperation.assignStates();
restoreLatestCheckpointedState 方法会从最近一次 Checkpoint 中恢复,因为任务刚启动, zk 中只存放了刚刚保存的这一次 Checkpoint 信息,所以这里就是从用户刚指定的 Checkpoint 目录进行恢复。恢复出来的是 CompletedCheckpoint 对象,也就是那一次 Checkpoint 完成时对应的元数据信息。
Checkpoint 元数据恢复完成后,JM 就会拿着元数据将 StateHandle 合理的分配给各个 subtask,下面来看一下分配流程。
二、 JM 端为 subtask 分配 StateHandle 流程
循环遍历一个个 ExecutionJobVertex
源码现在来到了 StateAssignmentOperation 类的 assignStates() 方法,assignStates 方法源码如下:
public void assignStates() {
// localOperators 保存所有的 恢复出来的 State
Map<OperatorID, OperatorState> localOperators = new HashMap<>(operatorStates);
// step 1: check
// 检查 Checkpoint 中恢复的所有 State 是否可以映射到 ExecutionGraph 上。
// 如果有 State 匹配不到执行的算子,且用户要求必须能够严格匹配,则抛出异常
checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);
// step 2: 循环遍历一个个 ExecutionJobVertex
// 从最新的 ExecutionGraph 中遍历一个个 task ,一个个 task 去进行 assign
for (Map.Entry<JobVertexID, ExecutionJobVertex> task : this.tasks.entrySet()) {
final ExecutionJobVertex executionJobVertex = task.getValue();
// find the states of all operators belonging to this task
List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
List<OperatorID> altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs();
// 记录所有有状态算子 OperatorState
List<OperatorState> operatorStates = new ArrayList<>(operatorIDs.size());
boolean statelessTask = true;
for (int x = 0; x < operatorIDs.size(); x++) {
// 优先使用 altOperatorID,altOperatorIDs 为空再使用 operatorIDs
OperatorID operatorID = altOperatorIDs.get(x) == null
? operatorIDs.get(x)
: altOperatorIDs.get(x);
OperatorState operatorState = localOperators.remove(operatorID);
// operatorState 为空,表示当前 Operator 没有恢复出来的 State
if (operatorState == null) {
// 无状态的 Operator,设置 OperatorID、并行度、最大并行度
// 这三个指标都与 新的 ExecutionGraph 中数据一致
operatorState = new OperatorState(
operatorID,
executionJobVertex.getParallelism(),
executionJobVertex.getMaxParallelism());
} else {
// 能走到这里,说明当前 task 找到了某个 Operator 的 State
statelessTask = false;
}
operatorStates.add(operatorState);
}
// statelessTask == true 表示当前 task 的所有 Operator 都不需要从 State 恢复
if (statelessTask) { // skip tasks where no operator has any state
continue;
}
// step 3: 真正分配 StateHandle 的逻辑
// 给 ExecutionGraph 中各个 subtask 分配 StateHandle,包括 Operator 和 Keyed
assignAttemptState(task.getValue(), operatorStates);
}
}
第一步仍然是对 State 进行检查。检查 Checkpoint 中恢复的所有 State 是否可以映射到新 Job 的 ExecutionGraph 上,并结合 allowNonRestoredState 参数进行校验。
第二步从最新的 ExecutionGraph 中遍历一个个 ExecutionJobVertex ,一个个 ExecutionJobVertex 去进行 assign。可能存在 OperatorChain,因此 ExecutionJobVertex 中可能保存多个 Operator,所以这里遍历 ExecutionJobVertex 中对应的所有 Operator 算子,看一个个 Operator 算子是否有状态的。用变量 statelessTask 标识当前 ExecutionJobVertex 是否是有状态的,如果 statelessTask 为 true 表示当前 ExecutionJobVertex 上所有 Operator 都是无状态的,直接 continue,即:不分配 State。否则当前 ExecutionJobVertex 上存在有状态的 Operator,需要走分配 State 的逻辑。
第三步 assignAttemptState(task.getValue(), operatorStates) 方法是真正分配 StateHandle 的逻辑,
为 ExecutionJobVertex 分配 StateHandle
assignAttemptState 源码如下:
private void assignAttemptState(ExecutionJobVertex executionJobVertex,
List<OperatorState> operatorStates) {
List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
//1. first compute the new parallelism
// check 并行度相关是否符合规则:
// 1、 Job 的并行度是否大于 Checkpoint 状态中保存的 最大并行度
// 2、 判断 MaxParallelism 是否改变:没有改变,则直接跳过
// 改变的情况:
// 如果用户主动配置了 MaxParallelism 则任务不能恢复,
// 如果 MaxParallelism 改变是因为框架自动改变的,则将 JobVertex 的 MaxParallelism 设置为 State 的 MaxParallelism
checkParallelismPreconditions(operatorStates, executionJobVertex);
// 为当前新的 ExecutionJobVertex 的所有 ExecutionVertex 生成了 KeyGroupRange
// ExecutionVertex 也就是 subtask 的概念
int newParallelism = executionJobVertex.getParallelism();
List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
executionJobVertex.getMaxParallelism(),
newParallelism);
final int expectedNumberOfSubTasks = newParallelism * operatorIDs.size();
// 给当前 ExecutionJobVertex 的所有 ExecutionVertex 重新分配 OperatorState
Map<OperatorInstanceID, List<OperatorStateHandle>> newManagedOperatorStates =
new HashMap<>(expectedNumberOfSubTasks);
Map<OperatorInstanceID, List<OperatorStateHandle>> newRawOperatorStates =
new HashMap<>(expectedNumberOfSubTasks);
reDistributePartitionableStates(xxx);
// 给当前 ExecutionJobVertex 的所有 ExecutionVertex 重新分配 KeyedState
Map<OperatorInstanceID, List<KeyedStateHandle>> newManagedKeyedState =
new HashMap<>(expectedNumberOfSubTasks);
Map<OperatorInstanceID, List<KeyedStateHandle>> newRawKeyedState =
new HashMap<>(expectedNumberOfSubTasks);
reDistributeKeyedStates(xxx);
// 将 四种 StateHandle 封装到 ExecutionJobVertex 中。
assignTaskStateToExecutionJobVertices(xxx);
}
assignAttemptState 主要包含以下五个工作:
- 并行度检验:对并行度和 MaxParallelism 进行检验,并设置合理的 MaxParallelism
- 为当前 ExecutionJobVertex 的所有 subtask 生成对应的 KeyGroupRange
- 给当前 ExecutionJobVertex 的所有 subtask 重新分配 OperatorState
- 给当前 ExecutionJobVertex 的所有 subtask 重新分配 KeyedState
- 将生成的四种 StateHandle 封装到 ExecutionJobVertex 中
下面分别介绍五个步骤。
三、 并行度检验
校验相关源码如下所示:
public void checkParallelismPreconditions(List<OperatorState> operatorStates,
ExecutionJobVertex executionJobVertex) {
for (OperatorState operatorState : operatorStates) {
checkParallelismPreconditions(operatorState, executionJobVertex);
}
}
private static void checkParallelismPreconditions(OperatorState operatorState,
ExecutionJobVertex executionJobVertex) {
// executionJobVertex 的并行度大于 State 设置的 MaxParallelism ,任务不能启动
if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
throw new IllegalStateException("");
}
if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
// MaxParallelism 改变的情况:
// 如果用户主动配置了 MaxParallelism 则任务不能恢复,
// 如果 MaxParallelism 改变是因为框架自动改变的,则将 JobVertex 的 MaxParallelism 设置为 State 的 MaxParallelism
if (!executionJobVertex.isMaxParallelismConfigured()) {
// if the max parallelism was not explicitly specified by the user, we derive it from the state
LOG.debug("Overriding maximum parallelism for JobVertex {} from {} to {}");
executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
} else {
// if the max parallelism was explicitly specified, we complain on mismatch
throw new IllegalStateException("The maximum parallelism");
}
}
}
校验并行度相关是否符合规则:
1、 Job 的并行度是否超过 Checkpoint 状态中保存的 最大并行度,如果超过,直接抛出异常,无法恢复
2、 判断新旧 Job 的 MaxParallelism 是否改变:没有改变,则直接跳过。如果改变了,则判断这个改变到底是用户主动设定的,还是 Flink 引擎生成导致的改变。
- 如果用户主动配置了 MaxParallelism 则任务不能恢复
- 如果 MaxParallelism 改变是因为框架自动改变的,则将 JobVertex 的 MaxParallelism 设置为 State 的 MaxParallelism
四、 给 subtask 生成 KeyGroupRange
在 KeyGroup 中讲到了,只要算子的并行度和 MaxParallelism 确定了,那么当前算子的每个 subtask 负责哪些 KeyGroup 也就确定了,即:每个 subtask 对应的 KeyGroupRange 就会确定。
所以给 subtask 生成 KeyGroupRange 的过程很简单,就是调用 KeyGroupRangeAssignment 工具类为每个 subtask 生成而已,源码如下所示:
public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups, int parallelism) {
Preconditions.checkArgument(numberKeyGroups >= parallelism);
List<KeyGroupRange> result = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
result.add(KeyGroupRangeAssignment.
computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
}
return result;
}
五、 给所有 subtask 重新分配 KeyedState
任务从 Checkpoint 启动时 KeyedState 应该如何分配,在之前的《Checkpoint 元数据详解》文中详细讲过,这里看一下代码实现。对应的 reDistributeKeyedStates 相关代码如下所示:
private void reDistributeKeyedStates(XXX) {
// 遍历一个个 Operator 算子
for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) {
// 拿到当前算子所有 subtask 的 State 元信息
OperatorState operatorState = oldOperatorStates.get(operatorIndex);
int oldParallelism = operatorState.getParallelism();
// 遍历一个个新的 subtask,看应该分配哪些 StateHandle 给这些新的 subtask
for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex,
newOperatorIDs.get(operatorIndex));
// 注:这里虽然遍历的一个个 subtask,给这个 subtask 分配 StateHandle
// 但是却将 Checkpoint 处恢复的当前算子所有 subtask 的 State 元信息
// 传递给 reAssignSubKeyedStates 方法
Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>> subKeyedStates =
reAssignSubKeyedStates(
operatorState,
newKeyGroupPartitions,
subTaskIndex,
newParallelism,
oldParallelism);
newManagedKeyedState.put(instanceID, subKeyedStates.f0);
newRawKeyedState.put(instanceID, subKeyedStates.f1);
}
}
}
遍历新 Job 的一个个 Operator 算子,从 oldOperatorStates 中获取到当前 Operator 算子所有 subtask 的 State 元信息。再去遍历 Operator 的一个个 subtask,看应该分配哪些 StateHandle 给这些新的 subtask。调用 reAssignSubKeyedStates 方法给 subtask 分配 StateHandle。
这里有个细节:reAssignSubKeyedStates 方法是给某一个 subtask 分配 StateHandle,但是却将 Checkpoint 处恢复的当前算子所有 subtask 的 State 元信息传递给 reAssignSubKeyedStates 方法。因为,要从旧 Job 的所有 subtask 中筛选到底要将哪些 StateHandle 分配给这个 subtask。
reAssignSubKeyedStates 方法源码如下所示:
private Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>>
reAssignSubKeyedStates(XXX) {
// 并行度没有改变,直接按照旧 Job 的 State 进行分配
if (newParallelism == oldParallelism) {
// 旧 Job 的当前 subtask 有 state,直接赋值
if (operatorState.getState(subTaskIndex) != null) {
subManagedKeyedState = operatorState.getState(subTaskIndex)
.getManagedKeyedState().asList();
subRawKeyedState = operatorState.getState(subTaskIndex)
.getRawKeyedState().asList();
} else {
// 旧 Job 的当前 subtask 没有 state,创建两个空的集合返回
subManagedKeyedState = Collections.emptyList();
subRawKeyedState = Collections.emptyList();
}
} else {
// 并行度改变的情况,把当前 OperatorState 的所有 subtask 的 StateHandle 全部遍历一遍,
// 看是否与 subtaskKeyGroupRange 有交集,有交集则将对应的 Handle 添加到 list 用于恢复
subManagedKeyedState = getManagedKeyedStateHandles(
operatorState, keyGroupPartitions.get(subTaskIndex));
subRawKeyedState = getRawKeyedStateHandles(
operatorState, keyGroupPartitions.get(subTaskIndex));
}
if (subManagedKeyedState.isEmpty() && subRawKeyedState.isEmpty()) {
return new Tuple2<>(Collections.emptyList(), Collections.emptyList());
} else {
return new Tuple2<>(subManagedKeyedState, subRawKeyedState);
}
}
reAssignSubKeyedStates 方法中主要分为两种情况:
并行度没有改变,则直接按照旧 Job 的 State 进行分配。旧 Job 的当前 subtask 如果有 State,直接赋值;旧 Job 的当前 subtask 没有 State,创建两个空的集合作为结果返回。
并行度改变的情况,依照之前的原理分析,需要把当前 Operator 算子所有 subtask 的 StateHandle 全部遍历一遍,看是否与当前 subtask 负责的 KeyGroupRange 有交集,有交集则将对应的 Handle 添加到 list 用于恢复。具体逻辑在 getManagedKeyedStateHandles 方法中。
getManagedKeyedStateHandles 方法相关源码如下所示:
public static List<KeyedStateHandle> getManagedKeyedStateHandles(
OperatorState operatorState,
KeyGroupRange subtaskKeyGroupRange) {
final int parallelism = operatorState.getParallelism();
List<KeyedStateHandle> subtaskKeyedStateHandles = null;
// 把从 Checkpoint 处恢复的 OperatorState 的所有 subtask 全部遍历一遍,
// 看是否与 subtaskKeyGroupRange 有交集,有交集则对应的 Handle 需要用于恢复
for (int i = 0; i < parallelism; i++) {
if (operatorState.getState(i) != null) {
Collection<KeyedStateHandle> keyedStateHandles =
operatorState.getState(i).getManagedKeyedState();
if (subtaskKeyedStateHandles == null) {
subtaskKeyedStateHandles = new ArrayList<>(
parallelism * keyedStateHandles.size());
}
// 将交集的 StateHandles 加入到 list 中
extractIntersectingState(
keyedStateHandles,
subtaskKeyGroupRange,
subtaskKeyedStateHandles);
}
}
return subtaskKeyedStateHandles;
}
getManagedKeyedStateHandles 方法会把从 Checkpoint 处恢复的 OperatorState 所有 subtask 的 keyedStateHandles 遍历一遍,看是否与 subtaskKeyGroupRange 有交集,有交集则将对应的 keyedStateHandles 加入到 list 中返回。具体判断交集的过程在 extractIntersectingState 方法中。
extractIntersectingState 方法源码如下所示:
// 将 originalSubtaskStateHandles 与 rangeToExtract 有交集的 KeyedStateHandle,
// 添加到 extractedStateCollector 集合中
private static void extractIntersectingState(
Collection<KeyedStateHandle> originalSubtaskStateHandles,
KeyGroupRange rangeToExtract,
List<KeyedStateHandle> extractedStateCollector) {
for (KeyedStateHandle keyedStateHandle : originalSubtaskStateHandles) {
if (keyedStateHandle != null) {
// 调用 KeyedStateHandle 的 getIntersection 方法与 KeyGroupRange 求交集,
// 并返回交集对应的 KeyedStateHandle
KeyedStateHandle intersectedKeyedStateHandle =
keyedStateHandle.getIntersection(rangeToExtract);
// 交集不为 null,则加入到结果中
if (intersectedKeyedStateHandle != null) {
extractedStateCollector.add(intersectedKeyedStateHandle);
}
}
}
}
extractIntersectingState 逻辑比较简单,遍历一个个 KeyedStateHandle,调用 KeyedStateHandle 的 getIntersection 方法与 KeyGroupRange 求交集,并返回交集对应的 KeyedStateHandle。交集不为 null,则加入到结果中。
这里找交集的过程与之前分析的原理完全吻合,但是之前分析的原理得出结论,虽然都是找交集,但是 KeyGroupsStateHandle 模式和 IncrementalRemoteKeyedStateHandle 模式还是有很大区别的。KeyGroupsStateHandle 模式是按照 keyGroup 粒度进行存储的,IncrementalRemoteKeyedStateHandle 是按照 RocksDB 实例进行存储的。KeyGroupsStateHandle 和 IncrementalRemoteKeyedStateHandle 这两个类都实现了 KeyedStateHandle,所以再分析一下这两个类中 getIntersection 方法的具体实现。
KeyedStateHandle 与 KeyGroupRange 求交集流程分析
之前的例子放这里,方便观察两者区别:
假设旧任务并发为 2:
- subtask a 负责 KeyGroupRange(0,9)
- subtask b 负责 KeyGroupRange(10,19)
新任务并发为 3:
- subtask A 负责 KeyGroupRange(0,6)
- subtask B 负责 KeyGroupRange(7,13)
- subtask C 负责 KeyGroupRange(14,19)
KeyGroupsStateHandle 模式的分配结果:
-
subtask B 会拿到 subtask a 的 KeyGroupsStateHandle,只需要 KeyGroup 7、8、9 的 offset
-
subtask B 还会拿到 subtask b 的 KeyGroupsStateHandle,只需要 KeyGroup 10、11、12、13 的 offset
-
subtask A、C 省略
RocksDB Incremental 模式的分配结果:
- subtask B 依赖了 subtask a 的 IncrementalRemoteKeyedStateHandle,无法分割,需要全部拿到
- subtask B 依赖了 subtask b 的 IncrementalRemoteKeyedStateHandle,无法分割,需要全部拿到
- subtask A、C 省略
两种区别明白了,具体看一下代码实现。
RocksDB Incremental 模式求交集流程
IncrementalRemoteKeyedStateHandle 类的 getIntersection 方法源码如下所示:
// IncrementalRemoteKeyedStateHandle 模式下,存储的完整的 RocksDB 数据,
// 所以如果没有交集,返回 null。否则返回 整个 IncrementalRemoteKeyedStateHandle,
// 不能返回 IncrementalRemoteKeyedStateHandle 的一部分 KeyGroup 子集
@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
// 求两个 KeyGroupRange 的交集
KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
// 交集等于空的 KeyGroupRange,表示没有交集,返回 null。
// 有交集时不是返回交集,而是返回 this
return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(intersection)
? null : this;
}
代码逻辑简单干脆,先求两个 KeyGroupRange 的交集,然后判断交集是否等于空的 KeyGroupRange,如果 equals 返回 true 表示没有交集,则结果返回 null。有交集时不是返回交集,而是返回 this,即整个 IncrementalRemoteKeyedStateHandle。
KeyGroupsStateHandle 模式求交集流程
KeyGroupsStateHandle 类的 getIntersection 方法源码如下所示:
public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
// stateHandle 表示具体的文件句柄,这里可以看出 stateHandle 没有改变,即:文件不变
// 只是对 KeyGroupRangeOffsets 求交集,重新构造
return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange),
stateHandle);
}
可以看到 KeyGroupsStateHandle 模式,KeyGroupsStateHandle 与 KeyGroupRange 求交集时,永远会 new 一个对象,不会返回 null。只不过某些 KeyGroupsStateHandle 内存储的 KeyGroupRange 会出现 start > end,即:表示一个空的 KeyGroupsStateHandle(正常情况下 start <= end)。
重新构造时 stateHandle 表示具体的文件句柄,这里可以看出 stateHandle 没有改变,即:仍然读取之前的状态文件。只是对当前的 KeyGroupRangeOffsets 与 keyGroupRange 求交集,重新构造 KeyGroupsStateHandle 返回。
具体 KeyGroupRangeOffsets 与 keyGroupRange 求交集的过程就比较简单了,最后调用了 KeyGroupRange 的求交集方法,具体源码如下所示:
public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>>
, Serializable {
// 当前 Operator 当前 subtask 负责的 KeyGroupRange
private final KeyGroupRange keyGroupRange;
// 数组保存了每个 KeyGroup 对应的 offset,
// 所以:数组的长度 == keyGroupRange 中 KeyGroup 的数量
private final long[] offsets;
}
// KeyGroupRangeOffsets 类的 getIntersection 方法源码:
public KeyGroupRangeOffsets getIntersection(KeyGroupRange keyGroupRange) {
Preconditions.checkNotNull(keyGroupRange);
// 对两个 keyGroupRange 求交集,
KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
// subOffsets 中维护了每个 KeyGroup 对应的 offset,
// 求完交集,可能 keyGroupRange 范围缩小了,这里需要重新构建 subOffsets 数组
long[] subOffsets = new long[intersection.getNumberOfKeyGroups()];
if(subOffsets.length > 0) {
System.arraycopy(
offsets,
computeKeyGroupIndex(intersection.getStartKeyGroup()),
subOffsets,
0,
subOffsets.length);
}
return new KeyGroupRangeOffsets(intersection, subOffsets);
}
// KeyGroupRange 类的方法源码:
public KeyGroupRange getIntersection(KeyGroupRange other) {
int start = Math.max(startKeyGroup, other.startKeyGroup);
int end = Math.min(endKeyGroup, other.endKeyGroup);
return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP_RANGE;
}
KeyGroupRangeOffsets 求交集比较得到的是实际的 KeyGroupRange 交集,例如:KeyGroupRange(10,15) 与 KeyGroupRange(12,18) 的交集为 KeyGroupRange(12,15)。代码也比较简单,就不多解释了。
分配 KeyedState 小结
KeyGroupsStateHandle 和 IncrementalRemoteKeyedStateHandle 两种模式对应到源码中:JM 给 TM 分配的 StateHandle 与之前分析的原理完全相符,后续再分析 TM 拿到这些 StateHandle 后,如何在 TM 端恢复具体的 State。
六、 给所有 subtask 重新分配 OperatorState
OperatorState 分配流程过于复杂,这里简单描述一下。OperatorState 的分配最终依赖 RoundRobinOperatorStateRepartitioner,见名之意:轮询策略的 OperatorState 分配器。
RoundRobinOperatorStateRepartitioner 的 repartitionState 方法源码如下所示:
public List<List<OperatorStateHandle>> repartitionState(
List<List<OperatorStateHandle>> previousParallelSubtaskStates,
int oldParallelism,
int newParallelism) {
List<List<OperatorStateHandle>> result = new ArrayList<>(newParallelism);
List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList;
// 并行度不变的情况
if (newParallelism == oldParallelism) {
// collectUnionStates 方法用于 找出所有 Union 类型的 State
Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>
unionStates = collectUnionStates(previousParallelSubtaskStates);
// unionStates 为空,表示都是 SPLIT_DISTRIBUTE 模式,直接按照老的 State 进行分配
if (unionStates.isEmpty()) {
return previousParallelSubtaskStates;
}
// 初始化 list 成 map 形式
mergeMapList = initMergeMapList(previousParallelSubtaskStates);
// unionState 遍历一遍,将所有 subtask 的 State,添加到所有的 map 中(分发到所有机器上)
repartitionUnionState(unionStates, mergeMapList);
} else {
// todo 并行度改变的情况,过于复杂,后续分析
// Reorganize: group by (State Name -> StreamStateHandle + Offsets)
GroupByStateNameResults nameToStateByMode =
groupByStateMode(previousParallelSubtaskStates);
// XXX
}
// map 转为 list,作为 结果
for (int i = 0; i < mergeMapList.size(); ++i) {
result.add(i, new ArrayList<>(mergeMapList.get(i).values()));
}
return result;
}
并行度不变的情况,先找出所有 Union 类型的 State 保存到 unionStates 中,unionStates 为空,表示都是 SPLIT_DISTRIBUTE 模式,直接按照老的 StateHandle 进行分配。Union 类型的 State 需要将所有 subtask 的 State,添加到所有的 map 中(分发到所有机器上)。
并行度改变的情况,过于复杂,后续再分析。
需要注意 OperatorState 分配完之后,分配完的结果如下图所示:
// 封装了 OperatorID 和 subtaskId
public class OperatorInstanceID {
private final OperatorID operatorId;
private final int subtaskId;
}
// 结果中维护了每个 subtask 及对应 OperatorStateHandle 的集合
Map<OperatorInstanceID, List<OperatorStateHandle>> newManagedOperatorStates =
new HashMap<>(expectedNumberOfSubTasks);
public class OperatorStreamStateHandle implements OperatorStateHandle {
// map 中 key 是 StateName,value 是 StateMetaInfo
// StateMetaInfo 中封装的是当前 State 在状态文件所处的 offset 和 Mode
private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
// OperatorState 状态文件句柄,可以读出状态数据
private final StreamStateHandle delegateStateHandle;
}
结果中维护了每个 subtask 及对应 OperatorStateHandle 的集合。OperatorStateHandle 这个类在 Checkpoint 元数据文中详细介绍过,OperatorStateHandle 可以读取到对应的状态文件,并拿到算子中定义的所有 State 在状态文件中的 offset 和 Mode。有了这些元数据,TM 端就可以正常恢复了。
七、 将生成的四种 StateHandle 封装到 ExecutionJobVertex 中
OperatorState 和 KeyedState 的 StateHandle 分配完成后,存放到了四个 Map 集合中,然后 assignTaskStateToExecutionJobVertices 方法会通过两层循环遍历,将分配好的 StateHandle 封装成 OperatorSubtaskState,最后封装成 JobManagerTaskRestore 对象 set 到 ExecutionJobVertex 中。
整个流程结束!
八、 总结
文中首先讲述了 JM 端恢复 Checkpoint 元数据的流程,之后就是 JM 端为 subtask 分配 StateHandle 流程,包括:
- 并行度检验:对并行度和 MaxParallelism 进行检验,并设置合理的 MaxParallelism
- 为当前 ExecutionJobVertex 的所有 subtask 生成对应的 KeyGroupRange
- 给当前 ExecutionJobVertex 的所有 subtask 重新分配 OperatorState
- 给当前 ExecutionJobVertex 的所有 subtask 重新分配 KeyedState
- 将生成的四种 StateHandle 封装到 ExecutionJobVertex 中