Flink源码解析

一文搞懂 Flink checkpoint snapshot全过

2020-07-06  本文已影响0人  shengjk1
前言

上一篇,我们了解了 checkpoint 全流程,现在我们具体讲解一下 checkpoint 时 snapshot 的全过程。现在我们具体看一下 checkpoint 时是如何做 snapshot 的

正文

checkpoint 全流程 我们可以知道

public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
                // 调用 StreamOperator 进行 snapshotState 的入口方法
                // 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)
                for (StreamOperator<?> op : allOperators) {
                    //对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress
                    // (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack
                    // 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法
                    checkpointStreamOperator(op);
                }
......
        }

是做 snapshot 逻辑,具体如下( AbstractStreamOperator.snapshotState )

@Override
    // 由此处统一持久化
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            // state 持久化
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    // 触发一个异步的 snapshot 至 DefaultOperatorStateBackend(内部的)
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            // source -> flatMap --> rebance --> filter --> keyby --> sink
            // 只有当 sink 的时候,keyedStateBackend 才不为 null , 才会执行 backend 的 snapshot
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    // 触发一个异步的 snapshot 至 StateBacked
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            throw new Exception(snapshotFailMessage, snapshotException);
        }

        return snapshotInProgress;
    }

由此可以知道,如果是 null != operatorStateBackend 则 operatorStateBackend.snapshot,如果 null != keyedStateBackend 则 keyedStateBackend.snapshot。
此处,我们以 RocksDBIncrementalSnapshotOperation 为例 ( operatorStateBackend.snapshot 的代码注释已经很清楚了 )

@Nonnull
    @Override
    protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
        long checkpointId,
        long checkpointTimestamp,
        @Nonnull CheckpointStreamFactory checkpointStreamFactory,
        @Nonnull CheckpointOptions checkpointOptions) throws Exception {

        final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);

        //  RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值
        //  kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
        final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);

        // 对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
        takeDBNativeCheckpoint(snapshotDirectory);

        // snapshot
        final RocksDBIncrementalSnapshotOperation snapshotOperation =
            new RocksDBIncrementalSnapshotOperation(
                checkpointId,
                checkpointStreamFactory,
                snapshotDirectory,
                baseSstFiles,
                stateMetaInfoSnapshots);

        return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
    }

进入 RocksDBIncrementalSnapshotOperation 内部

@Override
        protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {

            boolean completed = false;

            // Handle to the meta data file
            SnapshotResult<StreamStateHandle> metaStateHandle = null;
            // Handles to new sst files since the last completed checkpoint will go here
            final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
            // Handles to the misc files in the current snapshot will go here
            final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();

            try {

                // 写 meta (全量) 到 hdfs
                metaStateHandle = materializeMetaData();

                // Sanity checks - they should never fail
                Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
                Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
                    "Metadata for job manager was not properly created.");

                //  将新产生的 sst file、misc file upload to checkpointFs
                uploadSstFiles(sstFiles, miscFiles);

                synchronized (materializedSstFiles) {
                    materializedSstFiles.put(checkpointId, sstFiles.keySet());
                }

                final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
                    new IncrementalRemoteKeyedStateHandle(
                        backendUID,
                        keyGroupRange,
                        checkpointId,
                        sstFiles,
                        miscFiles,
                        metaStateHandle.getJobManagerOwnedSnapshot());

                //PermanentSnapshotDirectory
                final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
                final SnapshotResult<KeyedStateHandle> snapshotResult;
                if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {

                    // 增量的 localSnapshot
                    IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
                        new IncrementalLocalKeyedStateHandle(
                            backendUID,
                            checkpointId,
                            directoryStateHandle,
                            keyGroupRange,
                            metaStateHandle.getTaskLocalSnapshot(),
                            sstFiles.keySet());

                    //  localSnapshot report to local state manager,
                    //  jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                    snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
                } else {
                    //jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
                    snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
                }

                completed = true;

                return snapshotResult;
            } finally {
                if (!completed) {
                    final List<StateObject> statesToDiscard =
                        new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
                    statesToDiscard.add(metaStateHandle);
                    statesToDiscard.addAll(miscFiles.values());
                    statesToDiscard.addAll(sstFiles.values());
                    cleanupIncompleteSnapshot(statesToDiscard);
                }
            }
        }

元数据是全部持久化,而数据仅仅将新产生的 sst file、misc file upload to checkpointFs

上一篇下一篇

猜你喜欢

热点阅读