etcd lock

2021-05-17  本文已影响0人  九日火

Etcd Lock详解

分布式情况下最终都会面临一个资源抢占的问题,解决问题的方法为抽象一个分布式锁,持有锁则可以操作资源,本文使用etcd实现一个分布式锁

简介

在正式使用之前,先简单介绍一下etcd的clientv3,etcd的client和服务器通信在v3版本为grpc,所以我们在使用时实际上就是使用grpc和服务器通信.

etcd的github地址为: https://github.com/etcd-io/etcd

在github中源代码中存在两个目录:

$ tree -L 1
├── alarm
.....
├── client
├── clientv3
.....

其中 clientv3为我们需要使用的client,本文的主角就是clientv3/concurrency

下载依赖

在明确我们使用的包后,我们直接依赖clientv3包到我们的工程中.

go get github.com/etcd-io/etcd/clientv3

如果需要梯子,请参考: 终端中设置代理

连接到etcd

在很多环境中我们启动etcd都是通过配置tls方式进行的,所以在连接etcd的时候需要使用tls的方式连接(可是百度上很多文章居然都没写.),具体的连接方式如下:

    tlsInfo := transport.TLSInfo{
        CertFile:      "etcd-v3.3.12-linux-amd64/etcd.pem",
        KeyFile:       "etcd-v3.3.12-linux-amd64/etcd-key.pem",
        TrustedCAFile: "etcd-v3.3.12-linux-amd64/ca.pem",
    }
    tlsConfig, err := tlsInfo.ClientConfig()
    if err != nil {
        log.Fatal(err)
    }
    config := clientv3.Config{
        Endpoints: []string{"127.0.0.1:12379"},#此处为etcd server监听地址
        TLS:       tlsConfig,
    }
    client, e := clientv3.New(config)
    if e != nil {
        log.Fatal(e.Error())
    }
    defer client.Close()

使用Mutex获取锁

在使用证书连接到etcd的server后,我们使用clientv3/concurrency包中的Mutex进行分布式锁

    # 生成一个30s超时的上下文
    timeout, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancelFunc()
    # 获取租约
    response, e := client.Grant(timeout, 30)
    if e != nil {
        log.Fatal(e.Error())
    }
    # 通过租约创建session
    session, e := concurrency.NewSession(client, concurrency.WithLease(response.ID))
    if e != nil {
        log.Fatal(e.Error())
    }
    defer session.Close()
    # 通过session和锁前缀
    mutex := concurrency.NewMutex(session, "/lock")
    e = mutex.Lock(timeout)
    if e != nil {
        log.Fatal(e.Error())
    }

    # 业务逻辑

    # 释放锁
    defer mutex.Unlock(timeout)

整体流程还是相对复杂的,接下来我们将一点一点解析:

1.生成30s超时的上下文(context)

锁的竞争是有时间的,不可能一直竞争下去,设定一个30s的超时时间,是让mutex.Lock()的时候有一个获取锁的时间,在此时间内,如果没有获取到锁则应该重试或者提示失败.

2.client.Grant获取租约

租约设置了一个和上下文一样的超时时间,保证租约有足够的时间

3.根据租约创建一个session回话,维护租约的过期时间

在client中抽象出了一个session对象来持续保持租约不过期,具体源码为:

//会话表示在客户端的生存期内保持活动的租约。
//应用程序可能会使用会话来解释活动性。
type Session struct {
    client *v3.Client
    opts   *sessionOptions
    id     v3.LeaseID

    cancel context.CancelFunc
    donec  <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops)
    }

    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = v3.LeaseID(resp.ID)
    }

    ctx, cancel := context.WithCancel(ops.ctx)
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
    //在客户端错误或取消上下文之前保持租约的活动状态
    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            //在保持活动频道关闭前接收信息
            // eat messages until keep alive channel closes
        }
    }()

    return s, nil
}

其实就是在newSession中启动一个协程不断读取keepAlive这个channel的数据

4.concurrency.NewMutex创建一个锁,调用lock开始竞争锁

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    //生成锁的key
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    //使用事务机制
    //比较key的revision为0(0标示没有key)
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    //则put key,并设置租约
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    //否则 获取这个key,重用租约中的锁(这里主要目的是在于重入)
    //通过第二次获取锁,判断锁是否存在来支持重入
    //所以只要租约一致,那么是可以重入的.
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    //通过前缀获取最先创建的key
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    //获取到自身的revision(注意,此处CreateRevision和Revision不一定相等)
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // 通过对比自身的revision和最先创建的key的revision得出谁获得了锁
    // 例如 自身revision:5,最先创建的key createRevision:3  那么不获得锁,进入waitDeletes
    //     自身revision:5,最先创建的key createRevision:5  那么获得锁

    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    // 等待其他程序释放锁,并删除其他revisions
    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}
// 等待持有锁的key删除
// 内部实现为等其他所有比当前createRevision小的key,监听删除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //WithLastCreate 按照CreateRevision排序,降序 例如 5 4 3 2 1 
    //WithMaxCreateRev 获取比maxCreateRev小的key
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        //监听删除事件
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}
//从revision开始监听删除事件,因为revision存在,所以也避免了ABA问题
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var wr v3.WatchResponse
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
            //遇到删除事件才返回
            if ev.Type == mvccpb.DELETE {
                return nil
            }
        }
    }
    if err := wr.Err(); err != nil {
        return err
    }
    if err := ctx.Err(); err != nil {
        return err
    }
    return fmt.Errorf("lost watcher waiting for delete")
}

CreateRevision: 创建时的revision

Header.Revision: etcd server现在的revision

ABA问题: CAS和ABA问题

5.释放锁

这里释放锁,比较简单,就是删除此key

//释放锁
func (m *Mutex) Unlock(ctx context.Context) error {
    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

revision

刚才说了那么多revision,这里我们需要详细的了解一下etcd中的mvcc多版本机制

那么我们先来明确几个概念:

main ID:在etcd中每个事务的唯一id,全局递增不重复.

sub ID: 在事务中的连续多个修改操作会从0开始编号,这个编号就是sub ID

revision: 由(mainID,subID)组成的唯一标识

所以现在我们的revision就是这么来的啦…

上一篇下一篇

猜你喜欢

热点阅读