Flink状态管理源码分析(三)-快照策略
快照策略(SnapshotStrategy)
Flink的检查点机制是建立在分布式一致快照之上的,从而实现数据处理的exactly-once处理语义。无论是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)还是Operator state(DefaultOperatorStateBackend)都会接收快照执行请求(snapshot方法),而具体的快照操作都交由具体的snapshot策略完成。
下面是Flink快照策略UML,可以看到Keyed state中的HeapSnapshotStrategy
和RocksDBSnapshotStrategyBase
分别对应堆内存和RocksDB(RocksDB又细分为全量快照和增量快照)存储后端的快照执行策略,而DefaultOperatorStateBackendSnapshotStrategy
对应着Operator state存储后端快照执行策略。
除了Keyed state和Operator state之外,因为savepoint本质也是snapshot的特殊实现,所以对应的savepoint执行策略SavepointSnapshotStrategy
也实现了SnapshotStrategy
接口。
下面是SnapshotStrategy
接口定义,其中定义了执行快照的所需步骤:
- 同步执行部分,用于生成执行快照所需的资源,为下一步写入快照数据做好资源准备。
- 异步执行部分,将快照数据写入到提供的
CheckpointStreamFactory
中。
public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> {
//同步执行生成快照的部分,可以理解为为执行快照准备必要的资源。
SR syncPrepareResources(long checkpointId) throws Exception;
//异步执行快照写入部分,快照数据写入到CheckpointFactory
SnapshotResultSupplier<S> asyncSnapshot(
SR syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions);
//用于执行异步快照部分的Supplier
@FunctionalInterface
interface SnapshotResultSupplier<S extends StateObject> {
//Performs the asynchronous part of a checkpoint and returns the snapshot result.
SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception;
}
}
下面是SnapshotResources
所对应的UML图:
- 全量快照
FullSnapshotResources
下分别对应着堆内存快照资源HeapSnapshotResources
以及RocksDB全量快照资源实现类RocksDBFullSnapshotResources
- RocksDB增量快照资源实现类
IncrementalRocksDBSnapshotResoruces
。 - Operator state快照资源实现类
DefaultOperatorStateBackendSnapshotResources
。
SnapshotResources
接口定义如下,只有一个release方法定义,用于在异步Snapshot执行完成后清空资源。
@Internal
public interface SnapshotResources {
/** Cleans up the resources after the asynchronous part is done. */
void release();
}
关于具体资源实现类我们在对应的快照策略中来查看。
堆内存快照策略(HeapSnasphotStrategy)
在看堆内存快照策略之前,我们先看下堆内存执行快照所对应的资源类HeapSnapshotResources
。通过上面的UML我们可以看到堆内存快照和RocksDB全量快照都实现了FullSnapshotResources
,这也说明了堆内存存储后端不存在增量快照的实现。
FullSnapshotResources
定义了与具体存储后端无关的全量执行全量快照资源,它们都是通过FullSnapshotAsyncWriter
来写快照数据。
FullSnapshotResources
接口定义如下,其中泛型K代表了具体存储key的数据类型。
public interface FullSnapshotResources<K> extends SnapshotResources {
//返回此状态快照的元数据列表,StateMetaInfoSnapshot记录每个状态对应快照元数据信息,比如state name、 backend 类型、序列化器等。
List<StateMetaInfoSnapshot> getMetaInfoSnapshots();
//创建用于遍历当前快照的迭代器
KeyValueStateIterator createKVStateIterator() throws IOException;
//当前快照对应的KeyGroupRange
KeyGroupRange getKeyGroupRange();
/** Returns key {@link TypeSerializer}. */
TypeSerializer<K> getKeySerializer();
/** Returns the {@link StreamCompressionDecorator} that should be used for writing. */
StreamCompressionDecorator getStreamCompressionDecorator();
}
下面我们看下HeapSnapshotStrategy
中的两个核心方法syncPrepareResources
和asyncSnapshot
。
class HeapSnapshotStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
...
//准备snapshot资源HeapSnapshotResources
@Override
public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
return HeapSnapshotResources.create(
registeredKVStates,
registeredPQStates,
keyGroupCompressionDecorator,
keyGroupRange,
getKeySerializer(),
totalKeyGroups);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
HeapSnapshotResources<K> syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
......
//SupplierWithException是Java Supplier可能抛出异常的函数接口,第一个泛型参数是supplier执行返回类型,第二个参数为Supplier中函数抛出的异常
final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier =
localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢复
&& !checkpointOptions.getCheckpointType().isSavepoint()
? () ->
createDuplicatingStream( //本地恢复并且当前不是savepoint,创建复制流
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
streamFactory,
localRecoveryConfig
.getLocalStateDirectoryProvider())
: () ->
createSimpleStream(//非本地恢复,或者是savepoint,创建简单流
CheckpointedStateScope.EXCLUSIVE, streamFactory);
return (snapshotCloseableRegistry) -> {
......
//输出数据流
final CheckpointStreamFactory.CheckpointStateOutputStream localStream =
streamWithResultProvider.getCheckpointOutputStream();
////使用KeyedBackendSerializationProxy写cp数据
final DataOutputViewStreamWrapper outView =
new DataOutputViewStreamWrapper(localStream);
serializationProxy.write(outView);
......
};
}
}
上面asyncSnapshot方法通过CheckpointStreamWithResultProvider
来创建快照输出流。该类核心就是封装了获取输出流,如果没有配置本地状态恢复,只会创建一个输出流来讲snapshot数据写入到job所配置的Checkpoint存储。如果配置了本地恢复,就需要将状态数据写本地了(本地数据恢复),所以对于这种情况会获取两个输出流,一个用于写配置的Checkpoint存储,一个用于写本地。
public interface CheckpointStreamWithResultProvider extends Closeable {
//关闭输出流,并返回带有流句柄的快照结果
@Nonnull
SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;
//返回snapshot输出流
@Nonnull
CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();
@Override
default void close() throws IOException {
getCheckpointOutputStream().close();
}
...
}
而CheckpointStreamWithResultProvider
的两个内部实现类也就分别对应了创建simple流(PrimaryStreamOnly,只会创建一个输出流, 这个流是我们配置checkpoint存储的写入地方,可能是远端HDFS、JobManager等),和创建duplicating流(PrimaryAndSecondaryStream,两个输出流,第一个流和PrimaryStreamOnly一样;第二个输出流用于写入到本地、TaskManager等,用于本地恢复)。
创建simple stream,下面可以看到只会创建一个primary stream。
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory)
throws IOException {
//创建主输出流
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
创建duplicating stream,可以看到除了一个primary stream外,还会创建写文件的second stream。
@Nonnull
static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnegative long checkpointId,
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
throws IOException {
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
try {
//cp数据写出路径
File outFile =
new File(
secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(
checkpointId),
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());
//构建写入文件的输出流
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
primaryOut, secondaryOut);
} catch (IOException secondaryEx) {
LOG.warn(
"Exception when opening secondary/local checkpoint output stream. "
+ "Continue only with the primary stream.",
secondaryEx);
}
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
上面CheckpointStreamFactory
创建输出流,该输出流用于将Checkpoint数据写入到外部,比如通过FsCheckpoihntStreamFactory
将检查点数据写到外部文件系统。
public interface CheckpointStreamFactory {
//创建一个新的状态输出流,CheckpointStateOutputStream为当前CheckpointStreamFactory内部静态抽象类
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
throws IOException;
//CheckpointStateOutputStream基类,相关实现都在CheckpointStreamFactory的子类
abstract class CheckpointStateOutputStream extends FSDataOutputStream {
//关闭数据流并获取句柄
@Nullable
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
//关闭数据流
@Override
public abstract void close() throws IOException;
}
}
RocksDB快照存储策略
上面的UML我们可以知道RocksDB快照存储策略主要对应三个核心类,抽象类RocksDBSnapshotStrategyBase
、全量快照策略RocksDBFullSnapshotStrategy
和增量快照策略RocksDBIncrementalSnapshotStrategy
。
RocksDBSnapshotStrategyBase
定义了一些RocksDB、state相关的成员变量,具体实现都在相关子类中。
全量快照
全量快照RocksDBFullSnapshotStrategy
用于创建RocksDBKeyedStateBackend
的全量快照,每次Checkpoint会将全量状态数据同步到远端(JobManager或HDFS)。
下面我们同样看下核心方法:asyncPrepareResources和asyncSnapshot。
public class RocksFullSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
......
@Override
public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
//构建RocksDB全量快照资源类,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了
//RocksDB 实例和快照Snapshot
return RocksDBFullSnapshotResources.create(
kvStateInformation,
registeredPQStates,
db,
rocksDBResourceGuard,
keyGroupRange,
keySerializer,
keyGroupPrefixBytes,
keyGroupCompressionDecorator);
}
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
FullSnapshotResources<K> fullRocksDBSnapshotResources,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
timestamp);
}
return registry -> SnapshotResult.empty();
}
//createCheckpointStreamSupplier和Heap中一样,根据是否启动本地恢复,创建Duplicating和simple stream
final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier =
createCheckpointStreamSupplier(
checkpointId, checkpointStreamFactory, checkpointOptions);
//创建全量异步Writer
return new FullSnapshotAsyncWriter<>(
checkpointOptions.getCheckpointType(),
checkpointStreamSupplier,
fullRocksDBSnapshotResources);
}
......
}
FullSnapshotAsyncWriter
也是一个Supplier,用于异步写全量快照数据到给定的输出流中。
public class FullSnapshotAsyncWriter<K>
implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
throws Exception {
......
//获取输出流
final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider =
checkpointStreamSupplier.get();
snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
//写快照数据到输出流中
writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
......
}
private void writeSnapshotToOutputStream(
@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
@Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets)
throws IOException, InterruptedException {
//通过输出视图将快照数据写入到指定输出流中,注意 checkpointStreamWithResultProvider可能写两份数据
final DataOutputView outputView =
new DataOutputViewStreamWrapper(
checkpointStreamWithResultProvider.getCheckpointOutputStream());
//写元数据
writeKVStateMetaData(outputView);
//为每个state实例写状态数据
try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) {
writeKVStateData(
kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
}
}
}
下面我们看下最关键的writeKVStateData,到底是怎么将全量数据写到外部的。我们抛开繁杂的细节,就看这里怎么写的。可以看到实际就是迭代KeyValueStateIterator
。
private void writeKVStateData(
final KeyValueStateIterator mergeIterator,
final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
final KeyGroupRangeOffsets keyGroupRangeOffsets)
throws IOException, InterruptedException {
......
try {
......
//就是遍历KeyValueStateIterator迭代器
// main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking
// key-group offsets.
while (mergeIterator.isValid()) {
......
writeKeyValuePair(previousKey, previousValue, kgOutView);
......
// request next k/v pair
previousKey = mergeIterator.key();
previousValue = mergeIterator.value();
mergeIterator.next();
}
......
} finally {
// this will just close the outer stream
IOUtils.closeQuietly(kgOutStream);
}
}
KeyValueStateIterator
就是记录了当前快照的所有key-value实体,RocksDB和Heap分别有各自的迭代器实现。
我们看下RocksStatesPerKeyGroupMergeIterator
是如何创建的。我们在上面看FullSnapshotResources
接口时看到了抽象方法createKVStateIterator
定义,该方法就是专门用于创建迭代器的。HeapSnapshotResources
和RocksDBFullSnapshotResources
分别实现了该方法来创建Heap和RocksDB迭代器。下面是RocksDBFullSnapshotResources.createKVStateIterator
实现。
@Override
public KeyValueStateIterator createKVStateIterator() throws IOException {
......
try {
//创建RocksDB ReadOptions,设置读取上面的RocksDB snapshot,该snapshot是在Checkpoint同步阶段生成的
ReadOptions readOptions = new ReadOptions();
closeableRegistry.registerCloseable(readOptions::close);
readOptions.setSnapshot(snapshot);
//RocksDBIteratorWrapper是对RocksDBIterator的一层包装
List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
createKVStateIterators(closeableRegistry, readOptions);
.......
//RocksStatesPerKeyGroupMergeIterator实际是将多个state实例(ColumnFamily)的迭代器包成一个迭代器
return new RocksStatesPerKeyGroupMergeIterator(
closeableRegistry,
kvStateIterators,
heapPriorityQueueIterators,
keyGroupPrefixBytes);
} catch (Throwable t) {
IOUtils.closeQuietly(closeableRegistry);
throw new IOException("Error creating merge iterator", t);
}
}
private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
new ArrayList<>(metaData.size());
int kvStateId = 0;
//每个state,也就是每个RocksDB的ColumnFamily都会创建一个迭代器
for (MetaData metaDataEntry : metaData) {
RocksIteratorWrapper rocksIteratorWrapper =
createRocksIteratorWrapper(
db,
metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle,
metaDataEntry.stateSnapshotTransformer,
readOptions);
kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
closeableRegistry.registerCloseable(rocksIteratorWrapper);
++kvStateId;
}
return kvStateIterators;
}
private static RocksIteratorWrapper createRocksIteratorWrapper(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle,
StateSnapshotTransformer<byte[]> stateSnapshotTransformer,
ReadOptions readOptions) {
//创建RocksDB Iterator,被包在了Flink定义的RocksDBIteratorWrapper中
RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
return stateSnapshotTransformer == null
? new RocksIteratorWrapper(rocksIterator)
: new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
}
上面代码可以看到这里的迭代器其实本质还是RocksDB自己的迭代器(指定了读取的snapshot),Flink将其包在了RocksDBIteratorWrapper
中(为什么需要包一层可以查看RocksDB自身官网Iterator异常处理)。因为可能有多个state实例,每个实例都有自己的一个迭代器,最后Flink将这些迭代器封装到一个迭代器中,即RocksStatetsPerKeyGroupMergeIterator
。
增量快照
RocksIncrementalSnapshotStrategy
是RocksDBKeyedStateBackend
增量快照策略,它是基于RocksDB的native Checkpoint来实现增量快照的。
我们在看RocksIncrementalSnapshotStrategy
的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照会用到的一些关键成员变量。
//RocksDB增量快照资源信息为内部类IncrementalRocksDBSnapshotResources
public class RocksIncrementalSnapshotStrategy<K>
extends RocksDBSnapshotStrategyBase<
K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {
//RocksDB实例目录
@Nonnull private final File instanceBasePath;
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
@Nonnull private final UUID backendUID;
//记录了checkpoint id和当前checkpoint sst文件映射关系
@Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
//最后一次完成的Checkpoint ID
private long lastCompletedCheckpointId;
//用于上传快照文件(RocksDB checkpoint生成的sst文件等)
private final RocksDBStateUploader stateUploader;
...
}
下面我们再看下同步资源准备阶段,主要做了两件事:
- 获取最近一次Checkpoint生成的sst文件,也就是通过materializedSstFiles获取。用于增量文件对比。
- 创建RocksDB Checkpoint。
@Override
public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
throws Exception {
//目录准备,如果开启本地恢复,则创建永久目录,否则创建临时目录
final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
new ArrayList<>(kvStateInformation.size());
//最近一次完成的Checkpoint 所生成的sst文件,用于增量对比
final Set<StateHandleID> baseSstFiles =
snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
//创建RocksDB 检查点
takeDBNativeCheckpoint(snapshotDirectory);
return new IncrementalRocksDBSnapshotResources(
snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);
}
takeDBNativeCheckpoint
就是同步创建RocksDB的Checkpoint,Checkpoint数据会在指定目录生成(sst文件、misc文件)。
private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
throws Exception {
try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
Checkpoint checkpoint = Checkpoint.create(db)) {
checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
} catch (Exception ex) {
......
}
}
asyncSnapshot内部很简单,主要创建RocksDBIncrementalSnapshotOperation
Supplier来创建增量快照。
@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
IncrementalRocksDBSnapshotResources snapshotResources,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory checkpointStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
...
return new RocksDBIncrementalSnapshotOperation(
checkpointId,
checkpointStreamFactory,
snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目录
snapshotResources.baseSstFiles, //上次Cp完成的sst文件
snapshotResources.stateMetaInfoSnapshots);
}
下面我们看下增量快照实现的核心RocksDBIncrementalSnapshotOperation
。
private final class RocksDBIncrementalSnapshotOperation
implements SnapshotResultSupplier<KeyedStateHandle> {
...
@Override
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
throws Exception {
...
// 当前RocksDB checkpoint生成的sst文件
final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// 当前RocksDB Checkpoint的misc files(元数据文件)
final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
......
//上传增量sst文件和misc 文件,uploadSstFiles方法内部获取遍历RocksDB Checkpoint目录比较新增sst文件
uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
//塞入当前Checkpoint对应sst文件
synchronized (materializedSstFiles) {
materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
......
}
}
我们再看下上面的uploadSstFiles方法实现:
private void uploadSstFiles(
@Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
@Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
@Nonnull CloseableRegistry snapshotCloseableRegistry)
throws Exception {
//增量sst本地文件路径
Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
//misc文件路径
Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
//当前RocksDB Checkpoint目录
Path[] files = localBackupDirectory.listDirectory();
if (files != null) {
//查找增量文件
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
//使用stateUploader上传增量sst文件
sstFiles.putAll(
stateUploader.uploadFilesToCheckpointFs(
sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
//上传misc文件
miscFiles.putAll(
stateUploader.uploadFilesToCheckpointFs(
miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
}
}
上面createUploadFilesPaths
方法用于对比查找增量sst文件,并生成要被上传的sst文件和misc文件。
private void createUploadFilePaths(
Path[] files,
Map<StateHandleID, StreamStateHandle> sstFiles,
Map<StateHandleID, Path> sstFilePaths,
Map<StateHandleID, Path> miscFilePaths) {
for (Path filePath : files) {
final String fileName = filePath.getFileName().toString();
//文件句柄
final StateHandleID stateHandleID = new StateHandleID(fileName);
//sst文件和最后一次Cp sst文件对比,查找增量
if (fileName.endsWith(SST_FILE_SUFFIX)) {
final boolean existsAlready =
baseSstFiles != null && baseSstFiles.contains(stateHandleID);
if (existsAlready) {
//对于之前已经存在的sst文件,只使用一个占位符说明之前上传过的,文件在共享目录
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
//新增文件,将要被上传的
sstFilePaths.put(stateHandleID, filePath);
}
} else {
//misc文件全部上传
miscFilePaths.put(stateHandleID, filePath);
}
}
}
可以看到增量快照的实现逻辑就是:
- 通过RocksDB的Checkpoint生成当前快照的sst文件(由于LSM特性,sst文件是不可变的).
- Flink每次记录当前Checkpoint id和其快照sst文件的映射关系。
- 上传当前Checkpoint对应的sst文件和misc文件。
- 之后的Checkpoint中如果还有之前的sst文件,那这些文件就不需要在上传到HDFS了。
可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst文件是递增不变的特性。
Operator state快照策略
Operator state的快照策略只有一个,即DefaultOperatorStateBackendSnapshotStrategy
,它将Operator state中的ListState和BroadcastState的快照数据写出到快照存储端。
class DefaultOperatorStateBackendSnapshotStrategy
implements SnapshotStrategy<
OperatorStateHandle,
DefaultOperatorStateBackendSnapshotStrategy
.DefaultOperatorStateBackendSnapshotResources> {
private final ClassLoader userClassLoader;
//Operator state中只有两类state:ListState和BroadcastState
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
protected DefaultOperatorStateBackendSnapshotStrategy(
ClassLoader userClassLoader,
Map<String, PartitionableListState<?>> registeredOperatorStates,
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) {
this.userClassLoader = userClassLoader;
this.registeredOperatorStates = registeredOperatorStates;
this.registeredBroadcastStates = registeredBroadcastStates;
}
......
}
在同步准备资源阶段,DefaultOperatorStateBackendSnapshotStrategy
只做了一件事:深拷贝ListState和BroadcastState。深拷贝的目的就是同步创建这个时刻的快照,以保证exactly-once。
@Override
public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) {
//存放拷贝后的Operator state
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassLoader);
try {
//将传递ListState和BroadcastState进行深拷贝,便于后续使用
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
//拷贝broad cast state
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
return new DefaultOperatorStateBackendSnapshotResources(
registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies);
}
深拷贝完Operator state后,asyncSnapshot方法就开始异步写快照数据到CheckpointStreamFactory
了。
@Override
public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
DefaultOperatorStateBackendSnapshotResources syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) {
......
return (snapshotCloseableRegistry) -> {
//创建输出流
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
......
//通过OperatorBackendSerializationProxy写快照数据到输出流
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(
operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
......
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
};
}