LevelDB数据存储大数据

9. LevelDB源码剖析之Current文件\Manifes

2017-08-13  本文已影响48人  随安居士

9.1 基本原理

版本信息有什么用?先来简要说明三个类的具体用途:

如果还不能给出答案,将上述三个类当作一个整体,再来看Version类组到底包含了哪些信息:

  1. 运行信息
    1.1 运行期各种递增ID值:log number(log编号)、next file number(下一个文件编号)、last sequence(单条write操作递增该编号,可认为是版本号)、prev log number(目前已弃用)。
    1.2 比较器名称
  2. 数据库元信息
    2.1 各Level的SSTable文件列表
    2.2 SSTable缓存
  3. Compaction信息
    3.1 Compaction Pointer
    3.2 通过Seek触发Compaction信息(文件名、Level);通过Compaction触发Compaction信息(score、level)

关于版本信息到底有什么用这个话题暂时先放一放,来看具体类。

9.2 VersionSet

VersionSet维护了一份Version列表,包含当前Alive的所有Version信息,列表中第一个代表数据库的当前版本。
VersionSet类只有一个实例,在DBImpl(数据库实现类)类中,维护所有活动的Version对象,来看VersionSet的所有语境。

9.2.1 数据库启动时

通过Current文件加载Manifset文件,读取Manifest文件完成版本信息恢复。

Status VersionSet::Recover(bool *save_manifest)
{
  ......
  //从current file中读取mainfest文件名
  // Read "CURRENT" file, which contains a pointer to the current manifest file
  std::string current;
  Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  if (!s.ok())
  {
    return s;
  }
  if (current.empty() || current[current.size() - 1] != '\n')
  {
    return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() - 1);

  //打开mainfest
  std::string dscname = dbname_ + "/" + current;
  SequentialFile *file;
  s = env_->NewSequentialFile(dscname, &file);
  if (!s.ok())
  {
    return s;
  }

  bool have_log_number = false;
  bool have_prev_log_number = false;
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
  uint64_t last_sequence = 0;
  uint64_t log_number = 0;
  uint64_t prev_log_number = 0;
  Builder builder(this, current_);

  {
    LogReporter reporter;
    reporter.status = &s;
    log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
    Slice record;
    std::string scratch;

    //依次读取manifest中的VersionEdit信息,构建VersionSet
    while (reader.ReadRecord(&record, &scratch) && s.ok())
    {
      VersionEdit edit;
      s = edit.DecodeFrom(record);
      if (s.ok())
      {
         //Comparator不一致时,返回错误信息
        if (edit.has_comparator_ &&
            edit.comparator_ != icmp_.user_comparator()->Name())
        {
          s = Status::InvalidArgument(
              edit.comparator_ + " does not match existing comparator ",
              icmp_.user_comparator()->Name());

              //实际上,这里可以直接break
        }
      }

      //构建当前Version
      if (s.ok())
      {
        builder.Apply(&edit);
      }

    ......
    }
  }
  delete file;
  file = NULL;

  ......
  
  if (s.ok())
  {
    Version *v = new Version(this);
    builder.SaveTo(v);

     //计算下次执行压缩的Level
    // Install recovered version
    Finalize(v);
    AppendVersion(v);
    manifest_file_number_ = next_file;
    next_file_number_ = next_file + 1;
    last_sequence_ = last_sequence;
    log_number_ = log_number;
    prev_log_number_ = prev_log_number;

    // See if we can reuse the existing MANIFEST file.
    if (ReuseManifest(dscname, current))
    {
      // No need to save new manifest
    }
    else
    {
      *save_manifest = true;
    }
  }

  return s;
}

Recover通过Manifest恢复VersionSet及Current Version信息,恢复完毕后Alive的Version列表中仅包含当Current Version对象。

9.2.2 Compaction时

Compaction(压缩)应该是LevelDB中最为复杂的功能,它需要Version类组的深度介入。来看VersionSet中所有和Compaction相关的接口声明。

        // Apply *edit to the current version to form a new descriptor that
        // is both saved to persistent state and installed as the new
        // current version.  Will release *mu while actually writing to the file.
        // REQUIRES: *mu is held on entry.
        // REQUIRES: no other thread concurrently calls LogAndApply()
        Status LogAndApply(VersionEdit* edit, port::Mutex* mu);

        // Pick level and inputs for a new compaction.
        // Returns NULL if there is no compaction to be done.
        // Otherwise returns a pointer to a heap-allocated object that
        // describes the compaction.  Caller should delete the result.
        Compaction* PickCompaction();

        // Return a compaction object for compacting the range [begin,end] in
        // the specified level.  Returns NULL if there is nothing in that
        // level that overlaps the specified range.  Caller should delete
        // the result.
        Compaction* CompactRange(
            int level,
            const InternalKey* begin,
            const InternalKey* end);

        // Create an iterator that reads over the compaction inputs for "*c".
        // The caller should delete the iterator when no longer needed.
        Iterator* MakeInputIterator(Compaction* c);

        // Returns true iff some level needs a compaction.
        bool NeedsCompaction() const {
            Version* v = current_;
            return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
        }

        // Add all files listed in any live version to *live.
        // May also mutate some internal state.
        void AddLiveFiles(std::set<uint64_t>* live);

数据库的读、写操作都可能触发Compaction,通过调用NeedCompaction判定是否需要执行Compaction,如需Compaction则调用PickCompaction获取Compactoin信息。

其他几个方法也和Compaction操作相关,其中LogAndApply非常重要,它将VersionEdit应用于Current Version、VersoinEdit持久化到Manifest文件、将新的Version做为Current Version。

Status VersionSet::LogAndApply(VersionEdit *edit, port::Mutex *mu)
{
  if (edit->has_log_number_)
  {
    assert(edit->log_number_ >= log_number_);
    assert(edit->log_number_ < next_file_number_);
  }
  else
  {
    edit->SetLogNumber(log_number_);
  }

  if (!edit->has_prev_log_number_)
  {
    edit->SetPrevLogNumber(prev_log_number_);
  }

  edit->SetNextFile(next_file_number_);
  edit->SetLastSequence(last_sequence_);

  //1. New Version = Current Version + VersionEdit
  Version *v = new Version(this);
  {
    Builder builder(this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }

  //2. 重新计算Compaction Level\Compaction Score
  Finalize(v);

  //3. 打开数据库时,创建新的Manifest并保存当前版本信息
  // Initialize new descriptor log file if necessary by creating
  // a temporary file that contains a snapshot of the current version.
  std::string new_manifest_file;
  Status s;
  if (descriptor_log_ == NULL)
  {
    // No reason to unlock *mu here since we only hit this path in the
    // first call to LogAndApply (when opening the database).
    assert(descriptor_file_ == NULL);
    new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
    edit->SetNextFile(next_file_number_);
    s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    if (s.ok())
    {
      descriptor_log_ = new log::Writer(descriptor_file_);
      //当前版本信息
      s = WriteSnapshot(descriptor_log_);
    }
  }


  //4. 保存增量信息,即VersionEdit信息
  // Unlock during expensive MANIFEST log write
  {
    mu->Unlock();

    // Write new record to MANIFEST log
    if (s.ok())
    {
      std::string record;
      edit->EncodeTo(&record);
      s = descriptor_log_->AddRecord(record);
      if (s.ok())
      {
        s = descriptor_file_->Sync();
      }
      if (!s.ok())
      {
        Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
      }
    }

    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
    if (s.ok() && !new_manifest_file.empty())
    {
      s = SetCurrentFile(env_, dbname_, manifest_file_number_);
    }

    mu->Lock();
  }

  //5. 将新的版本添加到Alive版本列表,并将其做为Current Version
  // Install the new version
  if (s.ok())
  {
    AppendVersion(v);
    log_number_ = edit->log_number_;
    prev_log_number_ = edit->prev_log_number_;
  }
  else
  {
    delete v;
    if (!new_manifest_file.empty())
    {
      delete descriptor_log_;
      delete descriptor_file_;
      descriptor_log_ = NULL;
      descriptor_file_ = NULL;
      env_->DeleteFile(new_manifest_file);
    }
  }

  return s;
}

9.2.3 读取数据时

LevelDB通过VersionSet中的TableCache对象完成数据读取。
TableCache是SSTable的缓存类,NewIterator方法通过传入指定的文件编号返回该文件的Iterator供外部使用。

class TableCache {
    public:
        TableCache(const std::string& dbname, const Options* options, int entries);
        ~TableCache();

        // Return an iterator for the specified file number (the corresponding
        // file length must be exactly "file_size" bytes).  If "tableptr" is
        // non-NULL, also sets "*tableptr" to point to the Table object
        // underlying the returned iterator, or NULL if no Table object underlies
        // the returned iterator.  The returned "*tableptr" object is owned by
        // the cache and should not be deleted, and is valid for as long as the
        // returned iterator is live.
        Iterator* NewIterator(const ReadOptions& options,
            uint64_t file_number,
            uint64_t file_size,
            Table** tableptr = NULL);

        // Evict any entry for the specified file number
        void Evict(uint64_t file_number);

    private:
        Env* const env_;
        const std::string dbname_;
        const Options* options_;
        Cache* cache_;
    };

缓存机制主要通过Cache对象实现,关于Cache的备忘下节会讲。

9.3 Version

Version维护了一份当前版本的SSTable的元数据,其对外暴露的接口大部分也和元数据相关:

void GetOverlappingInputs(
            int level,
            const InternalKey* begin,         // NULL means before all keys
            const InternalKey* end,           // NULL means after all keys
            std::vector<FileMetaData*>* inputs);

        // Returns true iff some file in the specified level overlaps
        // some part of [*smallest_user_key,*largest_user_key].
        // smallest_user_key==NULL represents a key smaller than all keys in the DB.
        // largest_user_key==NULL represents a key largest than all keys in the DB.
        bool OverlapInLevel(int level,
            const Slice* smallest_user_key,
            const Slice* largest_user_key);

        // Return the level at which we should place a new memtable compaction
        // result that covers the range [smallest_user_key,largest_user_key].
        int PickLevelForMemTableOutput(const Slice& smallest_user_key,
            const Slice& largest_user_key);

        int NumFiles(int level) const { return files_[level].size(); }

还有两个数据库读取操作相关的方法Get、UpdateStats,来看Get:

Status Version::Get(const ReadOptions& options,
        const LookupKey& k,
        std::string* value,
        GetStats* stats)
    {
        Slice ikey = k.internal_key();
        Slice user_key = k.user_key();
        const Comparator* ucmp = vset_->icmp_.user_comparator();
        Status s;

        stats->seek_file = NULL;
        stats->seek_file_level = -1;
        FileMetaData* last_file_read = NULL;
        int last_file_read_level = -1;

        // We can search level-by-level since entries never hop across
        // levels.  Therefore we are guaranteed that if we find data
        // in an smaller level, later levels are irrelevant.
        std::vector<FileMetaData*> tmp;
        FileMetaData* tmp2;

        //1. 查找包含指定Key的所有文件
        for (int level = 0; level < config::kNumLevels; level++) {
            size_t num_files = files_[level].size();
            if (num_files == 0) continue;

            // Get the list of files to search in this level
            FileMetaData* const* files = &files_[level][0];
            if (level == 0) {    //1.1 Level-0可能存在多个文件均包含该Key
                // Level-0 files may overlap each other.  Find all files that
                // overlap user_key and process them in order from newest to oldest.
                tmp.reserve(num_files);
                for (uint32_t i = 0; i < num_files; i++) {
                    FileMetaData* f = files[i];
                    if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
                        ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
                        tmp.push_back(f);
                    }
                }
                if (tmp.empty()) continue;

                std::sort(tmp.begin(), tmp.end(), NewestFirst);    //将文件按更新顺序排列
                files = &tmp[0];
                num_files = tmp.size();
            }
            else {            //1.2 Level-0之上,一个Key只可能存在于一个文件中
                // Binary search to find earliest index whose largest key >= ikey.
                uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
                if (index >= num_files) {
                    files = NULL;
                    num_files = 0;
                }
                else {
                    tmp2 = files[index];
                    if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
                        // All of "tmp2" is past any data for user_key
                        files = NULL;
                        num_files = 0;
                    }
                    else {
                        files = &tmp2;
                        num_files = 1;
                    }
                }
            }

            //2. 遍历所有文件,查找Key值数据。
            for (uint32_t i = 0; i < num_files; ++i) {
                if (last_file_read != NULL && stats->seek_file == NULL) {
                    // We have had more than one seek for this read.  Charge the 1st file.
                    stats->seek_file = last_file_read;
                    stats->seek_file_level = last_file_read_level;
                }

                FileMetaData* f = files[i];
                last_file_read = f;
                last_file_read_level = level;

                //2.1 SSTable迭代器
                Iterator* iter = vset_->table_cache_->NewIterator(
                    options,
                    f->number,
                    f->file_size);
                iter->Seek(ikey);    //2.2 查找指定Key
                const bool done = GetValue(iter, user_key, value, &s);    //2.3 Get Value
                if (!iter->status().ok()) {
                    s = iter->status();
                    delete iter;
                    return s;
                }
                else {
                    delete iter;
                    if (done) {
                        return s;
                    }
                }
            }
        }

        return Status::NotFound(Slice());  // Use an empty error message for speed
    }

9.4 VersionEdit

版本建变化除运行期编号修改外,最主要的是SSTable文件的增删信息。当Compaction执行时,必然会出现部分SSTable无效被移除,合并生成的新SSTable被加入到数据库中。VersionEdit提供AddFile、DeleteFile完成变更标识。

VersionEdit提供的另外一个主要功能接口声明如下:

void VersionEdit::EncodeTo(std::string* dst) const {
        //1. 序列化比较器
        if (has_comparator_) {
            PutVarint32(dst, kComparator);
            PutLengthPrefixedSlice(dst, comparator_);
        }
        //2. 序列化运行期编号信息
        if (has_log_number_) {
            PutVarint32(dst, kLogNumber);
            PutVarint64(dst, log_number_);
        }
        if (has_prev_log_number_) {
            PutVarint32(dst, kPrevLogNumber);
            PutVarint64(dst, prev_log_number_);
        }
        if (has_next_file_number_) {
            PutVarint32(dst, kNextFileNumber);
            PutVarint64(dst, next_file_number_);
        }
        if (has_last_sequence_) {
            PutVarint32(dst, kLastSequence);
            PutVarint64(dst, last_sequence_);
        }
        //3. 序列化Compact Pointer
        for (size_t i = 0; i < compact_pointers_.size(); i++) {
            PutVarint32(dst, kCompactPointer);
            PutVarint32(dst, compact_pointers_[i].first);  // level
            PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
        }

        //4. 序列化本次版本变化的SSTable文件列表
        for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
        iter != deleted_files_.end();
            ++iter) {
            PutVarint32(dst, kDeletedFile);
            PutVarint32(dst, iter->first);   // level
            PutVarint64(dst, iter->second);  // file number
        }

        for (size_t i = 0; i < new_files_.size(); i++) {
            const FileMetaData& f = new_files_[i].second;
            PutVarint32(dst, kNewFile);
            PutVarint32(dst, new_files_[i].first);  // level
            PutVarint64(dst, f.number);
            PutVarint64(dst, f.file_size);
            PutLengthPrefixedSlice(dst, f.smallest.Encode());
            PutLengthPrefixedSlice(dst, f.largest.Encode());
        }
    }

9.5 总结

回到最开始的问题:版本信息由什么用?

每个LevelDB有一个Current File,Current File内唯一的信息为:当前数据库的Manifest文件名。Manifest中包含了上次运行后全部的版本信息,LevelDB通过Manifest文件恢复版本信息。

LevelDB的版本信息为富语义功能组,它所包含的信息已经大大超出了版本定义本身。如果将Version类封装为结构体、VersionSet仅仅为Version列表、VersionEdit也是单纯的结构数据,再为上述结构提供多套功能类应该更为合理。目前来看,这应当算作LevelDB实现的一处臭味。


转载请注明:【随安居士】http://www.jianshu.com/p/27e48eae656d

上一篇下一篇

猜你喜欢

热点阅读