flink

state源码分析

2018-06-12  本文已影响104人  edd72e125c98

KVState-> AbstractHeapState

function的state

ListCheckpointed

function要继承 ListCheckpointed来支持list-style state redistribution, 而弃用checkpointed
List<T> snapshotState(long checkpointId, long timestamp)
void restoreState(List<T> state)

CheckpointedAsynchronously当前版本在使用ListCheckpointed还没有实现异步

CheckpointedFunction

FlinkKafkaConsumerBase :> CheckpointedFunction, CheckpointListener
基本上也是snapshotState(), initializeState(), notifyCheckpointCompleted() 来操作这个pending的offsets和offsetsStateForCheckpoint

/** Data for pending but uncommitted offsets */
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();

operator的state

StreamOperator

operator需要实现这个interface
.snapshotState()

CheckpointedRestoringOperator

void restoreState(FSDataInputStream in)

key groups

为了 dynamically scale Flink operators that use partitoned(key-value) state, 使用key group概念把多个key进行分组

KeyGroupRangeOffsets denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets

RetrievableStateHandle和SteamStateHandle
两个都是能返回一个可以拿出被写入Checkpoint outputStream流里的state的实体。 RetrievableStateHandle返回的是一个直接可用的object,SteamStateHandle 返回一个seekable的inputStream

CheckpointStateOutputStream是所有state序列化的统一入口

operator state and keyed-state

operator state被分为了operator state (= non-partitioned state)和 keyed-state (= partitioned state)
Keyed state is organized as a List<KeyGroupsStateHandle>. Each KeyGroupsStateHandle consists of one StreamStateHandle and one KeyGroupRangeOffsets object. KeyGroupRangeOffsets denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The StreamStateHandle gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets.

KeyGroupsStateHandle包含一个KeyGroupRangeOffsets和StreamStateHandle
KeyGroupRangeOffsets: 包含keyGroupRange,里面就是这个组的keys, 以及在stream里的每个keyGroup的offest
StreamStateHandle: 管理这个state stream, 应该是真正持有数据的载体

这两种state好像是在StateAssignmentOperation中调用的, 这个class负责recovery from checkpoint后reassign state, 重新分key group

AbstractStreamOperator

AbstractStreamOperator:>StreamOperator, 主要实现两个方法snapshotState ()initializeState()

KeyedStateCheckpointOutputStream和OperatorStateCheckpointOutputStream

这两个stream就是abstractStreamOperator中raw state的实现, managed state是通过stateBackend实现。
KeyedStateCheckpointOutputStream 通过keyGroup实现的redistribution
OperatorStateCheckpointOutputStream通过List-style-redistribution, Each operator returns a List of state elements(LongArrayList partitionOffsets)

Paste_Image.png
StateSnapshotContextSynchronousImpl
  1. getRawKeyedOperatorStateOutput(),getRawOperatorStateOutput()两个方法build 相应的CheckpointOutputStream
  2. getKeyedStateStreamFuture()getOperatorStateStreamFuture()都是把delegate(CheckpointOutputStream).closeAndGetHandle()两个raw state放入DoneFuture, 说明只能同步的,不能异步
OperatorSnapshotResult

result of snapshotState(), 维护4个future, 分别是raw和managed, 和key和operator的

private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
    private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
    private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
    private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;

回过来说AbstractStreamOperator,这个方法是override StreamOperator的

OperatorSnapshotResult snapshotState(
            long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) 
{
//得到keyGroupRange
                KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                streamFactory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

     //把timeservice写入rawKeyStateCheckpointStream
            snapshotState(snapshotContext);

        //把结果存入到OperatorSnapshotResult
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

                //raw的state写完了, 下面写managed state
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
            }
}

这个方法主要是snapshot Operator中的HeapInternalTimerService,通过getRawKeyedOperatorState()写入KeyedStateCheckpointOutputStream 里,这里处理了 raw的 keyed state。子类会重写这个方法来加逻辑。

 void snapshotState(StateSnapshotContext context) {
   KeyedStateCheckpointOutputStream out;
   out = context.getRawKeyedOperatorStateOutput();

         try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
                    dov.writeInt(timerServices.size());

                    for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
                        String serviceName = entry.getKey();
                        HeapInternalTimerService<?, ?> timerService = entry.getValue();

                        dov.writeUTF(serviceName);
                        timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
                    }
                }
}
initializeState()

主要就是把四个state(backend 和 raw), 通过stateHandles

@Override
    public final void initializeState(OperatorStateHandles stateHandles)
{
    // 两个raw和一个 managed backend
                Collection<KeyGroupsStateHandle> keyedStateHandlesRaw = null;
        Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
        Collection<OperatorStateHandle> operatorStateHandlesBackend = null;

 // 这里就是初始化了 keyStateStore,通过StreamTask初始化keyStateBackend的类field
           initKeyedState();
。。。。。。。。
   // 通过SteamTask进行初始化, 然后赋值给operatorStateBackend的类field
    initOperatorState(operatorStateHandlesBackend);


    StateInitializationContext initializationContext = new StateInitializationContextImpl(
                restoring, // information whether we restore or start for the first time
                operatorStateBackend, // access to operator state backend
                keyedStateStore, // access to keyed state backend
                keyedStateHandlesRaw, // access to keyed state stream
                operatorStateHandlesRaw, // access to operator state stream
                getContainingTask().getCancelables()); // access to register streams for canceling
// 从rawKeyedState里恢复TimerService
        initializeState(initializationContext);

}

NOTE: 在AbstractStreamOperator有processWatermark方法,内部service.advanceWatermark(mark.getTimestamp());

getPartitionedState()

其内部调用的是 keyedStateBackend.getPartitionedState, partitioned state就是keyedState

    protected <S extends State, N> S getPartitionedState(
            N namespace, TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, ?> stateDescriptor) throws Exception {

        if (keyedStateStore != null) {
            return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
.......

State interface

State

ValueState : 单值状态
ListState : 集合状态
FoldingState : folding状态,for FoldFunction
ReducingState : reducing状态,for ReduceFunction
这些interface在runtime和rocksDb pack里有具体实现


Paste_Image.png

#######StateDescriptor
每个descriptor都提供了bind(),在指定的backend上创建一个新的state。


ValueStateDescriptor提供了带TypeInfo的constructor, TypeInfo可以lazily构造TypeSerilizer。 TypeInfo有着提供各种默认的class的构造子类

KVState

提供两接口

/**
     * Sets the current namespace, which will be used when using the state access methods.
     *
     * @param namespace The namespace.
     */
    void setCurrentNamespace(N namespace);

    /**
     * Returns the serialized value for the given key and namespace.
     *
     * <p>If no value is associated with key and namespace, <code>null</code>
     * is returned.
     *
     * @param serializedKeyAndNamespace Serialized key and namespace
     * @return Serialized value or <code>null</code> if no value is associated
     * with the key and namespace.
     * @throws Exception Exceptions during serialization are forwarded
     */
    byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;

kvstate有3个generic 和 heap 和 rocksdb的实现


Paste_Image.png

AbstractHeapState

AbstractHeapState:> KvState
StateTable

//heap实现中真正存kv的map, k就是
    /** Map containing the actual key/value pairs */
    protected final StateTable<K, N, SV> stateTable;

其内部存储了 List<Map<N, Map<K, ST>>> state;
list的每个元素代表一个keyGroup对应的state, map是namespace ->(key-> real value)

HeapValueState
class HeapValueState<K, N, V>
        extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
        implements ValueState<V> {

继承两个接口
ValueState:> State
AbstractHeapState :> KVState (get)

其中state的name和namespace的什么关系? name就是人为指定的state名字, 而namespace则是
keyed state.getCurrentKey() 这里的key是什么? 这个就是keyGroup的key,不是state的name.

在使用keyedStateBackend.getParitionedState()时, 会根据name从keyValueStatesByName中拿到state(kvState), 再会设置state.currentNamespace(),然后call HeapValueState.value()
(ValueState有两个接口 value()和update())。, 这个时候就会通过KeyedStateBackend找到currentKeyGroupIndex和key,从StateTable中用keyGrouopIndex得到namespaceMap,再用namespace和key得到value。update的逻辑也是类似的。

AbstractBackend

提供三个方法

/**
     * Creates a {@link CheckpointStreamFactory} that can be used to create streams
     * that should end up in a checkpoint. 一个用于Checkpoint的stream
     */
    public abstract CheckpointStreamFactory createStreamFactory
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend
public OperatorStateBackend createOperatorStateBackend(
            Environment env,
            String operatorIdentifier) throws Exception {
        return new DefaultOperatorStateBackend(env.getUserClassLoader());
    }

NOTE:这个方法没被重载 就是concrete方法

Paste_Image.png
AbstractKeyedStateBackend

FsKeyedStateBackend和MemoryStateBackend使用 HeapKeyedStateBackend(AbstractBackend.createKeyedStateBackend()
RockDBStateBackend使用RockDBKeyedStateBackend

Paste_Image.png

提供了很多关于key操作的接口(继承自keyedStateBackend)

Paste_Image.png

这个class的核心method为getPartitionedState()

public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
// 省略一些检查。。。。

// keyValueSateByName 就是state name -> KVSate 的mapping,如果没有新建一个
if (keyValueStatesByName == null) {
            keyValueStatesByName = new HashMap<>();
        }

// lastName和lastState是caching,并设置**namespace**
        if (lastName != null && lastName.equals(stateDescriptor.getName())) {
            lastState.setCurrentNamespace(namespace);
            return (S) lastState;
        }

// 从keyValueStatesByName里拿, 并赋值给caching lastXXX
        KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            lastState = previous;
            lastState.setCurrentNamespace(namespace);
            lastName = stateDescriptor.getName();
            return (S) previous;
        }

// create a new blank key/value state
//用sd.bind进行backend的绑定, bind方法也是根据实现的子类不同call相应类型state的create方法, 类似factory method
        S state = stateDescriptor.bind(new StateBackend() {
            @Override
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }

            @Override
            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }

            @Override
            public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
            }

            @Override
            public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
            }

        });

        KvState kvState = (KvState) state;

        keyValueStatesByName.put(stateDescriptor.getName(), kvState);

        lastName = stateDescriptor.getName();
        lastState = kvState;

        kvState.setCurrentNamespace(namespace);

// Publish queryable state。。。。。

keyValueStatesByName的key就是state的name

Snapshotable

所以的backend都实现了其snapshot和restore方法
这两个方法才是真正做Checkpoint和读取Checkpoint恢复的方法。

snapshot
各个backend有各自的实现, 这里以heapKeyedSatatBackend为例

 RunnableFuture<KeyGroupsStateHandle> snapshot(long checkpointId,
            long timestamp,
            CheckpointStreamFactory streamFactory) {

//CheckpointStateOutputStream 才是真正的往外部写Checkpoint的stream
try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
                createCheckpointStateOutputStream(checkpointId, timestamp)) {

                    //包装类, 提供一个write() method
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
.....

                //最后把stateTables里的所以state写入outView ( Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();)

                // 统计meta到metaInfoProxyList
            for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {

                RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
                KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
                        metaInfo.getStateType(),
                        metaInfo.getName(),
                        metaInfo.getNamespaceSerializer(),
                        metaInfo.getStateSerializer());

                metaInfoProxyList.add(metaInfoProxy);
                kVStateToId.put(kvState.getKey(), kVStateToId.size());
            }

// 写checkpoint
for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
                    //写的同时记住offest, 然后放在return的
                keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
                outView.writeInt(keyGroupIndex);
                for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
                    outView.writeShort(kVStateToId.get(kvState.getKey()));
                    writeStateTableForKeyGroup(outView, kvState.getValue(), keyGroupIndex);
                }
            }


                     //返回用的streamStateHandle
                        StreamStateHandle streamStateHandle = stream.closeAndGetHandle();

                         KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);

            final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
                        // 返回一个keyGroupsStateHandle
            return new DoneFuture<>(keyGroupsStateHandle);


}

restore ???。。。。


Paste_Image.png

RockDb backend

AbstractRocksDBState

writeKeyWithGroupAndNamespace() - 把一系列的state相关key(keyGroup, key, namespace)写入keySerializationStream, 然后得到的是写入rockdb的key

protected void writeKeyWithGroupAndNamespace(
            int keyGroup, K key, N namespace,
            ByteArrayOutputStreamWithPos keySerializationStream,
            DataOutputView keySerializationDataOutputView) throws IOException {

        Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");

        keySerializationStream.reset();
        writeKeyGroup(keyGroup, keySerializationDataOutputView);
        writeKey(key, keySerializationStream, keySerializationDataOutputView);
        writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView);
    }

getSerializedValue() - 本质还是通过byte[] serializedKeyAndNamespace从rockdb中得到value

RocksDBValueState

继承自ValueState的两个接口 update和value
update()

@Override
    public void update(V value) throws IOException {
        if (value == null) {
            clear();
            return;
        }
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
        try {
// 调用AbstractRockDBState的方法, 把keyByStream的key, keyGroup, namespace当做rockdb的key写入keySerializationStream, 并toByteArray
            writeCurrentKeyWithGroupAndNamespace();
            byte[] key = keySerializationStream.toByteArray();
            keySerializationStream.reset();
// 序列化value
            valueSerializer.serialize(value, out);
// 写入db, columnFamily就是这个state的name, 来自stateDescriptor
            backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
        } catch (Exception e) {
            throw new RuntimeException("Error while adding data to RocksDB", e);
        }
    }
RocksDBStateBackend :> AbstractStateBackend

实现了其 创建operator和keyed state backend的方法 - createKeyedStateBackend()createOperatorStateBackend()

RcoksDBKeyedStateBackend

restore()

snapshot() - 根据是否是savepoint和enableIncrementalCheckpointing来选择Incrementally 还是fully

snapshotFully() - 完全异步实现

snapshotIncrementally() - semi-async
分为两步 - RocksDBIncrementalSnapshotOperation.takeSnapshot()materializeSnapshot()

private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
            final long checkpointId,
            final long checkpointTimestamp,
            final CheckpointStreamFactory checkpointStreamFactory) throws Exception {

        final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
            new RocksDBIncrementalSnapshotOperation<>(
                this,
                checkpointStreamFactory,
                checkpointId,
                checkpointTimestamp);

        synchronized (asyncSnapshotLock) {
            if (db == null) {
                throw new IOException("RocksDB closed.");
            }

            if (!hasRegisteredState()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
                            checkpointTimestamp + " . Returning null.");
                }
                return DoneFuture.nullValue();
            }
                   // rocksdb 进行自身的checkpoint
            snapshotOperation.takeSnapshot();
        }

        return new FutureTask<KeyedStateHandle>(
            new Callable<KeyedStateHandle>() {
                @Override
                public KeyedStateHandle call() throws Exception { 
                                     //物化到hdfs上的checkpoint目录里
                    return snapshotOperation.materializeSnapshot();
                }
            }
        ) {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                snapshotOperation.stop();
                return super.cancel(mayInterruptIfRunning);
            }

            @Override
            protected void done() {
                                 // 释放resource
                snapshotOperation.releaseResources(isCancelled());
            }
        };
    }

RocksDBIncrementalSnapshotOperation

  1. RocksDBIncrementalSnapshotOperation.takeSnapshot()` - 同步, 用rocksdb自身进行checkpoint
    void takeSnapshot() throws Exception {
            assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));

            // use the last completed checkpoint as the comparison base.
            synchronized (stateBackend.materializedSstFiles) {
                baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
            }

            // save meta data
            for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                    : stateBackend.kvStateInformation.entrySet()) {
                stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
            }

            // save state data
            backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
            backupFileSystem = backupPath.getFileSystem();
            if (backupFileSystem.exists(backupPath)) {
                throw new IllegalStateException("Unexpected existence of the backup directory.");
            }

            // create hard links of living files in the checkpoint path
            Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
            // 创建真正的checkpoint在本地disk上, 存储:
            // 1. 已经存在的sst文件的link
            // 2. a copied manifest files and other files(可能包括新建文件吧)
            checkpoint.createCheckpoint(backupPath.getPath());
        }
  1. RocksDBIncrementalSnapshotOperation.materializeSnapshot() - 异步

    KeyedStateHandle materializeSnapshot() throws Exception {
    
       stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
    
       // write meta data
       metaStateHandle = materializeMetaData();
    
       // write state data
       Preconditions.checkState(backupFileSystem.exists(backupPath));
    
       FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
       if (fileStatuses != null) {
         // 打开 takeSnapsot的directory, 遍历, 如果是上个cp已经保存过的, 就用placeholder, 用share state后期来替换
         // 如果不存在, 就materializeStateData, 真正写ufs
          for (FileStatus fileStatus : fileStatuses) {
             final Path filePath = fileStatus.getPath();
             final String fileName = filePath.getName();
             final StateHandleID stateHandleID = new StateHandleID(fileName);
    
             if (fileName.endsWith(SST_FILE_SUFFIX)) {
                final boolean existsAlready =
                   baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
    
                if (existsAlready) {
                   // we introduce a placeholder state handle, that is replaced with the
                   // original from the shared state registry (created from a previous checkpoint)
                   sstFiles.put(
                      stateHandleID,
                      new PlaceholderStreamStateHandle());
                } else {
                   sstFiles.put(stateHandleID, materializeStateData(filePath));
                }
             } else {
                StreamStateHandle fileHandle = materializeStateData(filePath);
                miscFiles.put(stateHandleID, fileHandle);
             }
          }
       }
       synchronized (stateBackend.materializedSstFiles) {
          stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
       }
     
       return new IncrementalKeyedStateHandle(
          stateBackend.operatorIdentifier,
          stateBackend.keyGroupRange,
          checkpointId,
          sstFiles,
          miscFiles,
          metaStateHandle);
    }
    

restore(Collection<KeyedStateHandle> restoreStateHandles) -

相应的, restore根据keyedStateHandle的实现来确定使用哪种operation里进行restore

try {
   if (restoreState == null || restoreState.isEmpty()) {
      createDB();
   } else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) {
      LOG.info("Converting RocksDB state from old savepoint.");
      restoreOldSavepointKeyedState(restoreState);
   } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
      RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
      restoreOperation.restore(restoreState);
   } else {
      RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
      restoreOperation.doRestore(restoreState);
   }
RocksDBIncrementalRestoreOperation

restoreInstance() - restore的逻辑是从hdfs读上来到local目录restoreInstancePath下面, 然后在通过创建软连接 或者创建restoredb读出并写入instanceRocksDBPath

private void restoreInstance(
      IncrementalKeyedStateHandle restoreStateHandle,
      boolean hasExtraKeys) throws Exception {

   // read state data
   Path restoreInstancePath = new Path(
      stateBackend.instanceBasePath.getAbsolutePath(),
      UUID.randomUUID().toString());

   try {
      final Map<StateHandleID, StreamStateHandle> sstFiles =
         restoreStateHandle.getSharedState();
      final Map<StateHandleID, StreamStateHandle> miscFiles =
         restoreStateHandle.getPrivateState();

     // 把数据读出来 写入本地restoreInstancePath
      readAllStateData(sstFiles, restoreInstancePath);
      readAllStateData(miscFiles, restoreInstancePath);

      // read meta data
      List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
         readMetaData(restoreStateHandle.getMetaStateHandle());

      List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();

      for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {

         ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
            stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
            stateBackend.columnOptions);

         columnFamilyDescriptors.add(columnFamilyDescriptor);
         stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
      }

     // 如果keyRange 有变化, 比如改变parallelism, 那么就把属于自己的keyRange 写入到db里面
      if (hasExtraKeys) {

         List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();

        // 将restoreInstancePath以db path, 初始化个临时restoreDb, 用来合并, 如果有就加入当前db
         try (RocksDB restoreDb = stateBackend.openDB(
               restoreInstancePath.getPath(),
               columnFamilyDescriptors,
               columnFamilyHandles)) {

            for (int i = 0; i < columnFamilyHandles.size(); ++i) {
               ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
               ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);

               Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
                  stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());

               if (null == registeredStateMetaInfoEntry) {

                  RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
                     new RegisteredKeyedBackendStateMetaInfo<>(
                        stateMetaInfoSnapshot.getStateType(),
                        stateMetaInfoSnapshot.getName(),
                        stateMetaInfoSnapshot.getNamespaceSerializer(),
                        stateMetaInfoSnapshot.getStateSerializer());

                  registeredStateMetaInfoEntry =
                     new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                        stateBackend.db.createColumnFamily(columnFamilyDescriptor),
                        stateMetaInfo);

                  stateBackend.kvStateInformation.put(
                     stateMetaInfoSnapshot.getName(),
                     registeredStateMetaInfoEntry);
               }

               ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;

               try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {

                  int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
                  byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
                  for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
                     startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
                  }

                  iterator.seek(startKeyGroupPrefixBytes);

                  while (iterator.isValid()) {

                     int keyGroup = 0;
                     for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
                        keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
                     }
                      
                    // 如果当前db有这个keGroup, 将kv放入当前rocksdb
                     if (stateBackend.keyGroupRange.contains(keyGroup)) {
                        stateBackend.db.put(targetColumnFamilyHandle,
                           iterator.key(), iterator.value());
                     }

                     iterator.next();
                  }
               }
            }
         }
      } else {
        // 直接建立hard link from restoreInputPath 到 instanceRocksDBPath
         // create hard links in the instance directory
         if (!stateBackend.instanceRocksDBPath.mkdirs()) {
            throw new IOException("Could not create RocksDB data directory.");
         }

         createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
         createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);

         List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
         stateBackend.db = stateBackend.openDB(
            stateBackend.instanceRocksDBPath.getAbsolutePath(),
            columnFamilyDescriptors, columnFamilyHandles);

         for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
            RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);

            ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
            RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
               new RegisteredKeyedBackendStateMetaInfo<>(
                  stateMetaInfoSnapshot.getStateType(),
                  stateMetaInfoSnapshot.getName(),
                  stateMetaInfoSnapshot.getNamespaceSerializer(),
                  stateMetaInfoSnapshot.getStateSerializer());

            stateBackend.kvStateInformation.put(
               stateMetaInfoSnapshot.getName(),
               new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
                  columnFamilyHandle, stateMetaInfo));
         }


         // use the restore sst files as the base for succeeding checkpoints
         synchronized (stateBackend.materializedSstFiles) {
            stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
         }
          // 把restored的CheckpointId 设为lastCompleteCheckpointId
         stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
      }
}

readAllStateData() - 把hdfs中文件 通过remoteFileHandle 写入restoreInstancePath

private void readAllStateData(
   Map<StateHandleID, StreamStateHandle> stateHandleMap,
   Path restoreInstancePath) throws IOException {

   for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
      StateHandleID stateHandleID = entry.getKey();
      StreamStateHandle remoteFileHandle = entry.getValue();
      //  从remoteFileHandle读入本地disk restoreInstancePath
      readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
   }
}

具体meta细节没研究

上一篇下一篇

猜你喜欢

热点阅读