badger (一个高性能的LSM K/V store)使用指南
badfer是一个纯Go实现的快速的嵌入式K/V数据库,针对LSM tree做了优化。
安装
$ go get github.com/dgraph-io/badger/...
数据库
打开一个数据库
opts := badger.DefaultOptions
opts.Dir = "/tmp/badger"
opts.ValueDir = "/tmp/badger"
db, err := badger.Open(opts)
if err != nil {
log.Fatal(err)
}
defer db.Close()
存储
存储kv
使用 Txn.Set()方法
err := db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte("answer"), []byte("42"))
return err
})
批量设置
wb := db.NewWriteBatch()
defer wb.Cancel()
for i := 0; i < N; i++ {
err := wb.Set(key(i), value(i), 0) // Will create txns as needed.
handle(err)
}
handle(wb.Flush()) // Wait for all txns to finish.
WriteBatch不允许任何读取。对于读-修改-写,应该使用事务API。
设置生存时间 TTL
Badger允许在键上设置一个可选的生存时间(TTL)值。一旦TTL结束,KEY将不再是可检索的,并且将进行垃圾收集。TTL可以使用Txn.SetWithTTL() 设置为一个time.Duration
的值
设置元数据
Txn.SetWithMeta()
设置用户元数据
使用 Txn.SetEntry()
可以一次性设置key, value, user metatadata and TTL
遍历keys
要遍历键,我们可以使用迭代器,可以使用 Txn.NewIterator()`方法获得迭代器。迭代按字节字典排序顺序进行。
err := db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(v []byte) error {
fmt.Printf("key=%s, value=%s\n", k, v)
return nil
})
if err != nil {
return err
}
}
return nil
})
前缀扫描
要遍历键前缀,可以将Seek()和ValidForPrefix()组合使用:
db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte("1234")
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(v []byte) error {
fmt.Printf("key=%s, value=%s\n", k, v)
return nil
})
if err != nil {
return err
}
}
return nil
})
键的遍历
Badger支持一种独特的迭代模式,称为只有键的迭代。它比常规迭代快几个数量级,因为它只涉及对lsm树的访问,而lsm树通常完全驻留在RAM中。要启用只有键的迭代,您需要设置IteratorOptions。PrefetchValues字段为false。这还可以用于在迭代期间对选定的键执行稀疏读取,只在需要时调用item.Value()。
err := db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
fmt.Printf("key=%s\n", k)
}
return nil
})
数据流
Badger提供了一个流框架,它可以并发地遍历数据库的全部或部分,将数据转换为自定义键值,并连续地将数据流输出,以便通过网络发送、写入磁盘,甚至写入Badger。这是比使用单个迭代器更快的遍历Badger的方法。Stream在管理模式和正常模式下都支持Badger。
stream := db.NewStream()
// db.NewStreamAt(readTs) for managed mode.
// -- Optional settings
stream.NumGo = 16 // Set number of goroutines to use for iteration.
stream.Prefix = []byte("some-prefix") // Leave nil for iteration over the whole DB.
stream.LogPrefix = "Badger.Streaming" // For identifying stream logs. Outputs to Logger.
// ChooseKey is called concurrently for every key. If left nil, assumes true by default.
stream.ChooseKey = func(item *badger.Item) bool {
return bytes.HasSuffix(item.Key(), []byte("er"))
}
// KeyToList is called concurrently for chosen keys. This can be used to convert
// Badger data into custom key-values. If nil, uses stream.ToList, a default
// implementation, which picks all valid key-values.
stream.KeyToList = nil
// -- End of optional settings.
// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
return proto.MarshalText(w, list) // Write to w.
}
// Run the stream
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
// Done.
删除一个key
使用Txn.Delete()
方法删除一个key
获取key value
通过 txn.Get获取value
err := db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte("answer"))
handle(err)
var valNot, valCopy []byte
err := item.Value(func(val []byte) error {
// This func with val would only be called if item.Value encounters no error.
// Accessing val here is valid.
fmt.Printf("The answer is: %s\n", val)
// Copying or parsing val is valid.
valCopy = append([]byte{}, val...)
// Assigning val slice to another variable is NOT OK.
valNot = val // Do not do this.
return nil
})
handle(err)
// DO NOT access val here. It is the most common cause of bugs.
fmt.Printf("NEVER do this. %s\n", valNot)
// You must copy it to use it outside item.Value(...).
fmt.Printf("The answer is: %s\n", valCopy)
// Alternatively, you could also use item.ValueCopy().
valCopy, err = item.ValueCopy(nil)
handle(err)
fmt.Printf("The answer is: %s\n", valCopy)
return nil
})
如果不存在 Txn.Get()
将会返回一个 ErrKeyNotFound
错误
请注意,Get()返回的值只在事务打开时有效。如果需要在事务外部使用值,则必须使用copy()将其复制到另一个字节片。
事务
只读事务
只读事务使用 DB.View()方法
err := db.View(func(txn *badger.Txn) error {
// Your code here…
return nil
})
读写事务锁
读写事务可以使用 DB.Update()方法
err := db.Update(func(txn *badger.Txn) error {
// Your code here…
return nil
})
手动管理事务
直接使用DB.NewTransaction()
函数,手动创建和提交事务。它接受一个布尔参数来指定是否需要读写事务。对于读写事务,需要调用Txn.Commit()
来确保事务已提交。对于只读事务,调用
txn.reject()
就可以了。commit()
也在内部调用
txn .reject()
来清除事务,因此只需调用Txn.Commit()就足以执行读写事务。
但是,如果您的代码由于某种原因(出错)没有调用Txn.Commit()
。就需要在defer中调用 txn . reject()
// Start a writable transaction.
txn := db.NewTransaction(true)
defer txn.Discard()
// Use the transaction...
err := txn.Set([]byte("answer"), []byte("42"))
if err != nil {
return err
}
// Commit the transaction and check for error.
if err := txn.Commit(); err != nil {
return err
}