spark 3.2 reuse pvc 功能改造
背景
spark reuse pvc
PVC: PersistentVolumeClaim. A PersistentVolumeClaim (PVC) is a request for storage by a user. It is similar to a Pod. Pods consume node resources and PVCs consume PV resources. Pods can request specific levels of resources (CPU and Memory). Claims can request specific size and access modes (e.g., they can be mounted ReadWriteOnce, ReadOnlyMany or ReadWriteMany, see AccessModes).
spark 3.2 推出 Support shuffle data recovery on the reused PVCs 功能,在spark on k8s部署模式下为pod申请pvc,在pod异常退出时,新创建的executor 通过复用历史pvc来恢复shuffle数据。
因为我们公司spark 任务使用 aws spot集群,每天都会有大量spark 任务因为节点回收导致task重算,从而导致执行时间过长、成本上升。因此,我们希望通过该特性减少 task 重算,通过复用 pvc 已经计算过的数据加速任务执行。
spark shuffle
前一个 Stage 的ShuffleMapTask进行 Shuffle Write, 把数据存储在 BlockManager 上面, 并且把数据位置元信息上报到 Driver 的 MapOutTrack 组件中, 下一个 Stage 根据数据位置元信息, 进行 Shuffle Read, 拉取上个 Stage 的输出数据。
在 Shuffle 阶段,如果有executor lost 发生, scheduler 会在 MapOutTrack 移除当前executor维护的元数据信息,scheduler 会重新调度 executorId上的所有task, Shuffle Read 阶段会重新调度前一个stage的部分task重新生成数据。
社区版
- 目标
不修改Spark调度逻辑情况下,尽可能从pvc中恢复shuffle数据。
StageA(100/300) -----> StageB(100/500) -----> StageC
StageC 在Shuffle read阶段因为Executor Lost出现FetchFailed异常,Spark的DAGScheduler会从 StageA 重算丢失的数据,如果在重算过程中 StageB 的shuffle 数据恢复了,则执行 StageC。 - 功能:
- 创建executor pod时,新建/复用已有的pvc (pvc数量-pod数量=可复用pvc)
- 启动executor时, 在shuffle write初始化时上报本地 shuffle元数据信息给MapOutTrack
- 局限
找了几个demo测试,发现基本没啥用,stage、task重算次数很多,pvc创建明显多于pod数量。-
对pvc数量管理不准确,数据恢复准确性很低。
- 社区版的pvc只会增加不会删除,在开启dynamicAllocation时,可能pvc数量远远大于pod数量,导致pvc被pod复用的准确性降低。
- 计算可复用PVC的逻辑误差较大,有些pod已经退出,但是持有的pvc标记为不可复用。
-
执行数据恢复操作的时间较晚
- 在executor 执行shuffle write时才上报数据,可能会导致stage重算。
-
查找shuffle 数据路径不正确
- cluser 模式下 shuffle 数据位于 blockmgr- 开头的文件夹。
-
stage重算时间过长,task重算次数过多
- Executor lost 则当前Executor计算过的task会重新计算一遍。
-
在 Shuffle Read 阶段,Executor lost 会调度前一个Stage 和 当前Stage丢失的任务
-
对pvc数量管理不准确,数据恢复准确性很低。
目标
- 通过复用 PVC 减少Stage、Task重算,降低Job的执行时间
实现
pvc数量精确控制
pod在创建时会从空闲的pvc中随机选择一个,如果pvc数量大于最终要运行的executor pod数量,可能会导致shuffle数据无法恢复。
- pod 删除时,当前pod使用的pvc一起删除。
- dynamicAllocation enable
- ContainerCreating timeout
- executor successed
- 有 Terminating 状态的Pod时,延迟创建,确保pvc被释放。
- 取 last executorPodsSnapshot 来精确计算可用pvc数量。
stage延迟调度
StageC 的task执行时抛出 FetchFailed 异常,DAGScheduler 会将 StageB 和 StageC 重新调度,调度 StageB时会调度 parent stageA 丢失的task。
- 开启reuse pvc 且stage retry times < spark.stage.maxConsecutiveAttempts /2 只调度当前Stage ,且延迟 spark.retry.shuffle.stage.delay 调度。等待pvc数据恢复
- stage retry times > spark.stage.maxConsecutiveAttempts /2 按原来逻辑计算
task 延迟失败重算
- 开启reuse pvc 在处理executor lost 时不重算已经成功的task,在shuffle read阶段如果pvc数据还未恢复再重新计算。
executor 恢复shuffle数据前置
- 在executor 初始化blockManger后,立即上报本地 shuffle 数据给MapOutTrack。
收益
spot实例回收的越多,性能提升越明显
缺陷
- executor pod 启动时间过长,依然等待超时,依然会将之前Stage进行retry。