一个简单的etcdV3样例
2018-10-08 本文已影响0人
fjxCode
func main() {
cliConfig := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 100,
}
cli, err := clientv3.New(cliConfig)
if err != nil {
println(err)
}
defer cli.Close()
//Put
if _, err = cli.Put(context.TODO(), "/test/k1", "v1"); err != nil {
println(err)
}
//Get
if resp, err := cli.Get(context.TODO(), "/etst/k1"); err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("resp:", resp)
}
//Del
if _, err = cli.Delete(context.TODO(), "/test/k1"); err != nil {
println(err)
}
//TXN
ctx, cancel := context.WithTimeout(context.Background(), 1000)
_, err = cli.Txn(ctx).If(clientv3.Compare(clientv3.Value("/test/k1"), ">", "v1")).Then(clientv3.OpPut("k1", "abc")).Else(clientv3.OpPut("k1", "ABC")).Commit()
cancel()
if err != nil {
println(err)
}
//Watch
rangeWatch := cli.Watch(context.Background(),"",clientv3.WithPrefix())
for watchResp := range rangeWatch {
for _,ev := range watchResp.Events{
fmt.Printf("%s %q :%q\n",ev.Type,ev.Kv.Key,ev.Kv.Value)
}
}
}
即使是本机启动,连接超时时间也不能少于1秒。
v3版 get的数据结构发生了变化,获取的是KV集合,字节数组需要转换,按如下方式获取KV值。
resp,err := cli.Get(context.TODO(),"/test/key1")
if err != nil {
println(err)
}
fmt.Println(string(resp.Kvs[0].Key))
fmt.Println(string(resp.Kvs[0].Value))
Get节点及子节点的值:
//添加WithPrefix()属性
resp,err = cli.Get(context.TODO(),"/test",clientv3.WithPrefix())
if err != nil {
println(err)
}
fmt.Println(string(resp.Kvs[0].Key),string(resp.Kvs[0].Value))
Txn事务完成了一次CAS原子操作。需要在context中配置超时时间。ctx用完有调用cancel()取消(不等待)。
调用了clientV3的Value方法取值,Compare方法比较,OpPut方法赋值,最后提交完成CAS操作。此处路径全部加/,使用的是绝对路径。
Watch方法由WatchChan返回若干个WatchResponse,
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
// cancelReason is a reason of canceling watch
cancelReason string
}
Get方法和Watch方法的返回结果,都附带了响应头:
type ResponseHeader struct {
// cluster_id is the ID of the cluster which sent the response.
ClusterId uint64 `protobuf:"varint,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
// member_id is the ID of the member which sent the response.
MemberId uint64 `protobuf:"varint,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"`
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
Revision int64 `protobuf:"varint,3,opt,name=revision,proto3" json:"revision,omitempty"`
// raft_term is the raft term when the request was applied.
RaftTerm uint64 `protobuf:"varint,4,opt,name=raft_term,json=raftTerm,proto3" json:"raft_term,omitempty"`
}
%q为单引号围绕的字符字面值。watchEvent.Kv.Key是字节数组,用fmt.Printf()加点位符%q打印的方式更好。
rangeWatch := cli.Watch(context.Background(),"",clientv3.WithPrefix())
for watchResp := range rangeWatch{
for _,watchEvent := range watchResp.Events{
fmt.Println(watchEvent.Type,string(watchEvent.Kv.Key))
}
}
rangeWatch := cli.Watch(context.Background(),"",clientv3.WithPrefix())
for watchResp := range rangeWatch{
for _,watchEvent := range watchResp.Events{
fmt.Printf("%s,%q,%q",watchEvent.Type,watchEvent.Kv.Key,watchEvent.Kv.Value)
}
}
事务Txn为context包装了一个超时时间。
context.TODO()和contest.Background()都返回一个空的context。contest.Background()没有截止日期,也没有cancel方法取消,一般用作main方法的cli操作上下文。
context.TODO()只对一个操作,单独配置一个上下文,如Txn事务,用完后调用cancel句柄取消。