cli,err := clientv3.New(clientv3.Config{
DialTimeout: 5 * time.Second,
- Endpoints:etcd的多个节点服务地址,因为我是单点本机测试,所以只传1个。
- DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
type Client struct {
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
- Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
- KV:我们主要使用的功能,即操作K-V。
- Lease:租约相关操作,比如申请一个TTL=10秒的租约。
- Watcher:观察订阅,从而监听最新的数据变化。
- Auth:管理etcd的用户和权限,属于管理员操作。
- Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。
1.2.1. kv对象的实例获取
kv := clientev3.NewKV(client)
type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
1.2.2. k-v 存储 put
putResp, err := kv.Put(context.TODO(),"/data-dir/example", "hello-world!")
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
ctx: Context包对象,是用来跟踪上下文的,列如超时控制
key: 存储对象的key
val: 存储对象的value
opts: 可变参数,额外选项
1.2.3 k-v查询 get
getResp, err := kv.Get(context.TODO(), "/data-dir/example")
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
从以上数据的存储和取值,我们知道put 返回PutResponse, get返回GetResponse,注意:不同的KV操作对应不同的response结构,定义如下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
type KeyValue struct {
// key is the key in bytes. An empty key is not allowed.
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// create_revision is the revision of last creation on this key.
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
// mod_revision is the revision of last modification on this key.
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
// value is the value held by the key, in bytes.
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
rangeResp, err := kv.Get(context.TODO(), "/data-dir/example", clientv3.WithPrefix())
withPrefix实际上会转化为范围查询,它根据前缀/data-dir/example生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。
lease := clientv3.NewLease(client)
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
- Grant:分配一个租约。
- Revoke:释放一个租约。
- TimeToLive:获取剩余TTL时间。
- Leases:列举所有etcd中的租约。
- KeepAlive:自动定时的续约某个租约。
- KeepAliveOnce:为某个租约续约一次。
- Close:貌似是关闭当前客户端建立的所有租约。
grantResp, err := lease.Grant(context.TODO(), 10)
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
ID LeaseID
TTL int64
Error string
kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
// sleep一会...
// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
type LeaseKeepAliveResponse struct {
ID LeaseID
TTL int64
KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。Etcd并没有提供API来实现原子的Put with Lease。
1.4 Op
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
func OpDelete(key string, opts …OpOption) Op
func OpGet(key string, opts …OpOption) Op
func OpPut(key, val string, opts …OpOption) Op
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
op1 := clientv3.OpPut("/hi", "hello", clientv3.WithPrevKV())
opResp, err := kv.Do(context.TODO(), op1)
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
etcd中事务是原子执行的,只支持if … then … else …这种表达,能实现一些有意思的场景。
txn := kv.Txn(context.TODO())
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=", "hello")).
Else(clientv3.OpGet("/test/", clientv3.WithPrefix())).
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
func Value(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
func Compare(cmp Cmp, result string, v interface{}) Cmp {
var r pb.Compare_CompareResult
switch result {
case "=":
r = pb.Compare_EQUAL
case "!=":
r = pb.Compare_NOT_EQUAL
case ">":
r = pb.Compare_GREATER
case "<":
r = pb.Compare_LESS
panic("Unknown result op")
cmp.Result = r
switch cmp.Target {
case pb.Compare_VALUE:
val, ok := v.(string)
if !ok {
panic("bad compare value")
cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
case pb.Compare_VERSION:
cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
case pb.Compare_CREATE:
cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
case pb.Compare_MOD:
cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
case pb.Compare_LEASE:
cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
panic("Unknown compare type")
return cmp
- key=xxx的value,必须!=,hello。
- key=xxx的create版本号,必须=,11233。
- key=xxx的lease id,必须=,12319231231238。
- func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
- func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
- func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
- func Value(key string) Cmp:key=xxx的创建值必须满足…
- func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
type TxnResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// succeeded is set to true if the compare evaluated to true or false otherwise.
Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"`
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
Responses []*ResponseOp `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"`
b、我们使用watch监听这对K-V,如果一切正常, 这时候请求会被阻塞住.
{ "action":"set", "node":{ "key":"/name", "value":"神蛋使者1号", "modifiedIndex":25, "createdIndex":25 }, "prevNode": { "key":"/name", "value":"神蛋使者", "modifiedIndex":24, "createdIndex":24 } }
1.6.1. watch接口定义
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
该接口定义了两个方法, Watch 和 Close
Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel, 定义如下:
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
type Event struct {
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
watcher结构很简单, 只有3个字段. remote抽象了发起watch请求的客户端, streams是一个map, 这个map映射了交互的数据流.还有一个保护并发环境下数据流读写安全的读写锁.
streams所属的watchGrpcStream类型抽象了所有交互的数据, 它的结构定义如下
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
比较有意思的是, watchGrpcStream也包含了一个watcher类型的owner字段, watcher和watchGrpcStream可以互相引用到对方.同时又定义了watcher类型中已经定义过的remote,而且还不是指针类型, 这点不大明白作用是啥.
还有几个字段值得关注, 一个是substreams, 看下它的定义和注释:
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// 应用配置
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
// 根据传入的参数构造watch请求
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
ok := false
// 将请求上下文格式化为字符串
ctxKey := fmt.Sprintf("%v", ctx)
// find or allocate appropriate grpc watch stream
// 接下来配置对应的输出流, 注意得加锁
// 如果stream为空, 返回一个已经关闭的channel.
// 这种情况应该是防止streams为空的情况
if w.streams == nil {
// closed
ch := make(chan WatchResponse)
return ch
// 注意这里, 前面我们提到streams是一个map,该map的key是请求上下文
// 如果该请求对应的流为空,则新建
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
donec := wgs.donec
reqc := wgs.reqc
// couldn't create channel; return closed channel
// couldn't create channel; return closed channel
// 这里要设置为缓冲的原因可能与下面的两个
// closeCh <- WatchResponse{closeErr: wgs.closeErr}
// 语句有关,这里不理解
closeCh := make(chan WatchResponse, 1)
// submit request
select {
// 发送上面构造好的watch请求给对应的流
case reqc <- wr:
ok = true
// 请求断开(这里应该囊括了客户端请求断开的所有情况)
case <-wr.ctx.Done():
// watch完成
// 这里应该是处理非正常完成的情况
// 注意下面的重试逻辑
case <-donec:
if wgs.closeErr != nil {
// 如果不是空上下文导致流被丢弃的情况
// 则不应该重试
closeCh <- WatchResponse{closeErr: wgs.closeErr}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
// receive channel
// 如果是初始请求顺利发送才会执行这里
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
return closeCh
func (w *watcher) Close() (err error) {
// 在锁内先将streams字段置为空
// 在锁外再将一个个流都关闭
// 这样做的意义在于不管哪个流关闭失败了
// 都能先保证streams与这些流的关系被切断
streams := w.streams
w.streams = nil
for _, wgs := range streams {
if werr := wgs.Close(); werr != nil {
err = werr
// etcd竟然也只是返回一个error
// 虽然上面的for循环可能产生多个error
return err