RocksDB. Merge Operator
1 为什么需要Merge Operator
RocksDB是一个高性能嵌入式持久化key-value存储引擎,提供了常规的Put,Get,Delete接口。还有一个常见的用法是,更新一个已经存在的key对应的value。用户需要做三件事:
- 调用Get接口,获取value的值
- 修改它
- 调用Put接口将它写回数据库
假设需要维护一组counter,每个counter都有不同的名字,需要为counter实现Set、Add、Get和Remove操作。
接口定义如下:
class Counters {
public:
// (re)set the value of a named counter
virtual void Set(const string& key, uint64_t value);
// remove the named counter
virtual void Remove(const string& key);
// retrieve the current value of the named counter, return false if not found
virtual bool Get(const string& key, uint64_t *value);
// increase the named counter by value.
// if the counter does not exist, treat it as if the counter was initialized to zero
virtual void Add(const string& key, uint64_t value);
};
对于Add接口的实现,可能是这样的
// implemented as get -> modify -> set
virtual void Add(const string& key, uint64_t value) {
uint64_t base;
if (!Get(key, &base)) {
base = kDefaultValue;
}
Set(key, base + value);
}
考虑到Get这种随机读操作相对还是比较慢的,如果RocksDB可以提供Add接口,那么可以这样调用
virtual void Add(const string& key, uint64_t value) {
string serialized = Serialize(value);
db->Add(add_option, key, serialized);
}
这看起来很合理,但是只适用于counter的场景。存在RocksDB中的数据并不都是counter,比如有可能是一个链表保存用户的位置信息,我们希望有一个Append接口将用户新的位置信息追加在链表后面。所以,这类更新操作的语义,依赖于用户的数据类型。RockDB抽象了这个更新操作,让用户可以自定义更新语义,在RocksDB中,叫做Merge
。
2 Merge
Merge接口提供了下面的语义
- 封装了read - modify - write语义,对外统一提供简单的抽象接口
- 减少用户重复触发Get操作引入的性能损耗
- 通过决定合并操作的时间和方式,来优化后端性能,并达到并不改变底层更新的语义
- 渐进式的更新,来均摊更新带来带来的性能损耗,以得到渐进式的性能提升。(Can, in some cases, amortize the cost over all incremental updates to provide asymptotic increases in efficiency.)
3 Merge接口的使用
MergeOperator
MergeOperator定义了几个方法来告诉RocksDB应该如何在已有的数据上做增量更新。这些方法(PartialMerge、FullMerge)可以组成新的merge操作。
RocksDB提供了接口AssociativeMergeOperator,这个接口封装了partial merge的实现细节,可以满足大部分场景的需要(数据类型是关联的)。
比AssociativeMergeOperator功能更多的是Generic MergeOperator,后面将会介绍它的应用场景。
AssociativeMergeOperator的接口声明
// The simpler, associative merge operator.
class AssociativeMergeOperator : public MergeOperator {
public:
virtual ~AssociativeMergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result
// here. The string that new_value is pointing to will be empty.
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success.
// All values passed in will be client-specific values. So if this method
// returns false, it is because client specified bad data or there was
// internal corruption. The client should assume that this will be treated
// as an error by the library.
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
private:
// Default implementations of the MergeOperator functions
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override;
};
自定义MergeOperator
用户需要定义一个子类,继承AssociativeMergeOperator或者MergeOperator基类,重载用到的接口。
RocksDB持有一个MergeOperator类型的成员变量,并提供了Merge接口。用户将自定义的MergeOperator子类赋值给DB对应的成员变量,这样RocksDB可以调用用户定义的Merge方法,达到用户定义merge语义的目的。
// In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
class DB {
...
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.
// Returns Status::NotSupported if DB does not have a merge_operator.
virtual Status Merge(
const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
...
};
Struct Options {
...
// REQUIRES: The client must provide a merge operator if Merge operation
// needs to be accessed. Calling Merge on a DB without a merge operator
// would result in Status::NotSupported. The client must ensure that the
// merge operator supplied here has the same name and *exactly* the same
// semantics as the merge operator provided to previous open calls on
// the same DB. The only exception is reserved for upgrade, where a DB
// previously without a merge operator is introduced to Merge operation
// for the first time. It's necessary to specify a merge operator when
// opening the DB in this case.
// Default: nullptr
const std::shared_ptr<MergeOperator> merge_operator;
...
};
自定义MergeOperator并使用的一个例子
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public AssociativeMergeOperator {
public:
virtual bool Merge(
const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (!Deserialize(*existing_value, &existing)) {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption");
existing = 0;
}
}
uint64_t oper;
if (!Deserialize(value, &oper)) {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption");
oper = 0;
}
auto new = existing + oper;
*new_value = Serialize(new);
return true; // always return true for this, since we treat all errors as "zero".
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public RocksCounters {
public:
MergeBasedCounters(std::shared_ptr<DB> db);
// mapped to a leveldb Merge operation
virtual void Add(const string& key, uint64_t value) override {
string serialized = Serialize(value);
db_->Merge(merge_option_, key, serialized);
}
};
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new UInt64AddOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db(dbp);
MergeBasedCounters counters(db);
counters.Add("a", 1);
...
uint64_t v;
counters.Get("a", &v);
Generic MergeOperator
关联性和非关联性
前面有提到过, 使用AssociativeMergeOperator的一个前提是:数据类型的关联性,即:
- 调用Put接口写入RocksDB的数据的格式和Merge接口是相同的
- 使用用户自定义的merge操作,可以将多个merge操作数合并成一个
例如上面的Counter的例子,调用Set接口,RocksDB将data保存为序列化的8字节整数。调用Add接口,data也是8字节整数。
MergeOperator还可以用于非关联型数据类型的更新。
例如,在RocksDB中保存json字符串,即Put接口写入data的格式为合法的json字符串。而Merge接口只希望更新json中的某个字段。所以代码可能是这样:
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use a pre-defined "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
AssociativeMergeOperator无法处理这种场景,因为它假设Put和Merge的数据格式是关联的。我们需要区分Put和Merge的数据格式,也无法把多个merge操作数合并成一个。这时候就需要Generic MergeOperator。
Generic MergeOperator 接口
// The Merge Operator
//
// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
};
- MergeOperator提供了两个方法, FullMerge和PartialMerge. 第一个方法用于对已有的值做Put或Delete操作. 第二个方法用于在可能的情况下将两个操作数合并.
- AssociativeMergeOperator继承了MergeOperator, 并提供了这些方法的默认实现, 暴露了简化后的接口.
- MergeOperator的FullMerge方法的传入exsiting_value和一个操作数序列, 而不是单独的一个操作数.
工作原理
当调用DB::Put()和DB:Merge()接口时, 并不需要立刻计算最后的结果. RocksDB将计算的动作延后触发, 例如在下一次用户调用Get, 或者RocksDB决定做Compaction时. 所以, 当merge的动作真正开始做的时候, 可能积压(stack)了多个操作数需要处理. 这种情况就需要MergeOperator::FullMerge来对existing_value和一个操作数序列进行计算, 得到最终的值.
PartialMerge和Stacking
有时候, 在调用FullMerge之前, 可以先对某些merge操作数进行合并处理, 而不是将它们保存起来, 这就是PartialMerge的作用: 将两个操作数合并为一个, 减少FullMerge的工作量.
当遇到两个merge操作数时, RocksDB总是先会尝试调用用户的PartialMerge方法来做合并, 如果PartialMerge返回false才会保存操作数. 当遇到Put/Delete操作, 就会调用FullMerge将已存在的值和操作数序列传入, 计算出最终的值.
Merge Best Practice
什么场景使用Merge
如果有如下需求,可以使用merge。
- 数据需要增量更新
- 经常需要读数据,而不知道数据的新value
使用Associative Merge的场景
- merge 操作数的格式和Put相同
- 多个顺序的merge操作数可以合并成一个
使用Generic Merge的场景
- merge 操作数的格式和Put不同
- 当多个merge操作数可以合并时,PartialMerge()方法返回true
限于篇幅, 这一篇主要是基于官方的wiki, 介绍Merge操作的使用和特性, 源码分析将放在下一篇.
参考资料:
https://github.com/facebook/rocksdb/wiki/Merge-Operator