RocksDBC++

RocksDB. Merge Operator

2017-06-07  本文已影响1632人  周肃

1 为什么需要Merge Operator

RocksDB是一个高性能嵌入式持久化key-value存储引擎,提供了常规的Put,Get,Delete接口。还有一个常见的用法是,更新一个已经存在的key对应的value。用户需要做三件事:

假设需要维护一组counter,每个counter都有不同的名字,需要为counter实现Set、Add、Get和Remove操作。
接口定义如下:

class Counters {
 public:
  // (re)set the value of a named counter
  virtual void Set(const string& key, uint64_t value);

  // remove the named counter
  virtual void Remove(const string& key);

  // retrieve the current value of the named counter, return false if not found
  virtual bool Get(const string& key, uint64_t *value);

  // increase the named counter by value.
  // if the counter does not exist,  treat it as if the counter was initialized to zero
  virtual void Add(const string& key, uint64_t value);
  };

对于Add接口的实现,可能是这样的

      // implemented as get -> modify -> set
      virtual void Add(const string& key, uint64_t value) {
        uint64_t base;
        if (!Get(key, &base)) {
          base = kDefaultValue;
        }
        Set(key, base + value);
      }

考虑到Get这种随机读操作相对还是比较慢的,如果RocksDB可以提供Add接口,那么可以这样调用

    virtual void Add(const string& key, uint64_t value) {
      string serialized = Serialize(value);
      db->Add(add_option, key, serialized);
    }

这看起来很合理,但是只适用于counter的场景。存在RocksDB中的数据并不都是counter,比如有可能是一个链表保存用户的位置信息,我们希望有一个Append接口将用户新的位置信息追加在链表后面。所以,这类更新操作的语义,依赖于用户的数据类型。RockDB抽象了这个更新操作,让用户可以自定义更新语义,在RocksDB中,叫做Merge

2 Merge

Merge接口提供了下面的语义

  1. 封装了read - modify - write语义,对外统一提供简单的抽象接口
  2. 减少用户重复触发Get操作引入的性能损耗
  3. 通过决定合并操作的时间和方式,来优化后端性能,并达到并不改变底层更新的语义
  4. 渐进式的更新,来均摊更新带来带来的性能损耗,以得到渐进式的性能提升。(Can, in some cases, amortize the cost over all incremental updates to provide asymptotic increases in efficiency.)

3 Merge接口的使用

MergeOperator

MergeOperator定义了几个方法来告诉RocksDB应该如何在已有的数据上做增量更新。这些方法(PartialMerge、FullMerge)可以组成新的merge操作。
RocksDB提供了接口AssociativeMergeOperator,这个接口封装了partial merge的实现细节,可以满足大部分场景的需要(数据类型是关联的)。
比AssociativeMergeOperator功能更多的是Generic MergeOperator,后面将会介绍它的应用场景。

AssociativeMergeOperator的接口声明

// The simpler, associative merge operator.
class AssociativeMergeOperator : public MergeOperator {
 public:
  virtual ~AssociativeMergeOperator() {}

  // Gives the client a way to express the read -> modify -> write semantics
  // key:           (IN) The key that's associated with this merge operation.
  // existing_value:(IN) null indicates the key does not exist before this op
  // value:         (IN) the value to update/merge the existing_value with
  // new_value:    (OUT) Client is responsible for filling the merge result
  // here. The string that new_value is pointing to will be empty.
  // logger:        (IN) Client could use this to log errors during merge.
  //
  // Return true on success.
  // All values passed in will be client-specific values. So if this method
  // returns false, it is because client specified bad data or there was
  // internal corruption. The client should assume that this will be treated
  // as an error by the library.
  virtual bool Merge(const Slice& key,
                     const Slice* existing_value,
                     const Slice& value,
                     std::string* new_value,
                     Logger* logger) const = 0;


 private:
  // Default implementations of the MergeOperator functions
  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                           MergeOperationOutput* merge_out) const override;

  virtual bool PartialMerge(const Slice& key,
                            const Slice& left_operand,
                            const Slice& right_operand,
                            std::string* new_value,
                            Logger* logger) const override;
};

自定义MergeOperator

用户需要定义一个子类,继承AssociativeMergeOperator或者MergeOperator基类,重载用到的接口。
RocksDB持有一个MergeOperator类型的成员变量,并提供了Merge接口。用户将自定义的MergeOperator子类赋值给DB对应的成员变量,这样RocksDB可以调用用户定义的Merge方法,达到用户定义merge语义的目的。

    // In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
    class DB {
      ...
      // Merge the database entry for "key" with "value". Returns OK on success,
      // and a non-OK status on error. The semantics of this operation is
      // determined by the user provided merge_operator when opening DB.
      // Returns Status::NotSupported if DB does not have a merge_operator.
      virtual Status Merge(
        const WriteOptions& options,
        const Slice& key,
        const Slice& value) = 0;
      ...
    };

    Struct Options {
      ...
      // REQUIRES: The client must provide a merge operator if Merge operation
      // needs to be accessed. Calling Merge on a DB without a merge operator
      // would result in Status::NotSupported. The client must ensure that the
      // merge operator supplied here has the same name and *exactly* the same
      // semantics as the merge operator provided to previous open calls on
      // the same DB. The only exception is reserved for upgrade, where a DB
      // previously without a merge operator is introduced to Merge operation
      // for the first time. It's necessary to specify a merge operator when
      // opening the DB in this case.
      // Default: nullptr
      const std::shared_ptr<MergeOperator> merge_operator;
      ...
    };

自定义MergeOperator并使用的一个例子

    // A 'model' merge operator with uint64 addition semantics
    class UInt64AddOperator : public AssociativeMergeOperator {
     public:
      virtual bool Merge(
        const Slice& key,
        const Slice* existing_value,
        const Slice& value,
        std::string* new_value,
        Logger* logger) const override {

        // assuming 0 if no existing value
        uint64_t existing = 0;
        if (existing_value) {
          if (!Deserialize(*existing_value, &existing)) {
            // if existing_value is corrupted, treat it as 0
            Log(logger, "existing value corruption");
            existing = 0;
          }
        }

        uint64_t oper;
        if (!Deserialize(value, &oper)) {
          // if operand is corrupted, treat it as 0
          Log(logger, "operand value corruption");
          oper = 0;
        }

        auto new = existing + oper;
        *new_value = Serialize(new);
        return true;        // always return true for this, since we treat all errors as "zero".
      }

      virtual const char* Name() const override {
        return "UInt64AddOperator";
       }
    };

    // Implement 'add' directly with the new Merge operation
    class MergeBasedCounters : public RocksCounters {
     public:
      MergeBasedCounters(std::shared_ptr<DB> db);

      // mapped to a leveldb Merge operation
      virtual void Add(const string& key, uint64_t value) override {
        string serialized = Serialize(value);
        db_->Merge(merge_option_, key, serialized);
      }
    };

    // How to use it
    DB* dbp;
    Options options;
    options.merge_operator.reset(new UInt64AddOperator);
    DB::Open(options, "/tmp/db", &dbp);
    std::shared_ptr<DB> db(dbp);
    MergeBasedCounters counters(db);
    counters.Add("a", 1);
    ...
    uint64_t v;
    counters.Get("a", &v);

Generic MergeOperator

关联性和非关联性

前面有提到过, 使用AssociativeMergeOperator的一个前提是:数据类型的关联性,即:

例如上面的Counter的例子,调用Set接口,RocksDB将data保存为序列化的8字节整数。调用Add接口,data也是8字节整数。

MergeOperator还可以用于非关联型数据类型的更新。
例如,在RocksDB中保存json字符串,即Put接口写入data的格式为合法的json字符串。而Merge接口只希望更新json中的某个字段。所以代码可能是这样:

    ...
    // Put/store the json string into to the database
    db_->Put(put_option_, "json_obj_key",
             "{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");

    ...

    // Use a pre-defined "merge operator" to incrementally update the value of the json string
    db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
    db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");

AssociativeMergeOperator无法处理这种场景,因为它假设Put和Merge的数据格式是关联的。我们需要区分Put和Merge的数据格式,也无法把多个merge操作数合并成一个。这时候就需要Generic MergeOperator。

Generic MergeOperator 接口

    // The Merge Operator
    //
    // Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
    // client knows. It could be numeric addition, list append, string
    // concatenation, edit data structure, ... , anything.
    // The library, on the other hand, is concerned with the exercise of this
    // interface, at the right time (during get, iteration, compaction...)
    class MergeOperator {
     public:
      virtual ~MergeOperator() {}

      // Gives the client a way to express the read -> modify -> write semantics
      // key:         (IN) The key that's associated with this merge operation.
      // existing:    (IN) null indicates that the key does not exist before this op
      // operand_list:(IN) the sequence of merge operations to apply, front() first.
      // new_value:  (OUT) Client is responsible for filling the merge result here
      // logger:      (IN) Client could use this to log errors during merge.
      //
      // Return true on success. Return false failure / error / corruption.
      virtual bool FullMerge(const Slice& key,
                             const Slice* existing_value,
                             const std::deque<std::string>& operand_list,
                             std::string* new_value,
                             Logger* logger) const = 0;

      // This function performs merge(left_op, right_op)
      // when both the operands are themselves merge operation types.
      // Save the result in *new_value and return true. If it is impossible
      // or infeasible to combine the two operations, return false instead.
      virtual bool PartialMerge(const Slice& key,
                                const Slice& left_operand,
                                const Slice& right_operand,
                                std::string* new_value,
                                Logger* logger) const = 0;

      // The name of the MergeOperator. Used to check for MergeOperator
      // mismatches (i.e., a DB created with one MergeOperator is
      // accessed using a different MergeOperator)
      virtual const char* Name() const = 0;
    };

工作原理

当调用DB::Put()和DB:Merge()接口时, 并不需要立刻计算最后的结果. RocksDB将计算的动作延后触发, 例如在下一次用户调用Get, 或者RocksDB决定做Compaction时. 所以, 当merge的动作真正开始做的时候, 可能积压(stack)了多个操作数需要处理. 这种情况就需要MergeOperator::FullMerge来对existing_value和一个操作数序列进行计算, 得到最终的值.

PartialMerge和Stacking

有时候, 在调用FullMerge之前, 可以先对某些merge操作数进行合并处理, 而不是将它们保存起来, 这就是PartialMerge的作用: 将两个操作数合并为一个, 减少FullMerge的工作量.
当遇到两个merge操作数时, RocksDB总是先会尝试调用用户的PartialMerge方法来做合并, 如果PartialMerge返回false才会保存操作数. 当遇到Put/Delete操作, 就会调用FullMerge将已存在的值和操作数序列传入, 计算出最终的值.

Merge Best Practice

什么场景使用Merge

如果有如下需求,可以使用merge。

使用Associative Merge的场景

使用Generic Merge的场景


限于篇幅, 这一篇主要是基于官方的wiki, 介绍Merge操作的使用和特性, 源码分析将放在下一篇.

参考资料:
https://github.com/facebook/rocksdb/wiki/Merge-Operator

上一篇 下一篇

猜你喜欢

热点阅读