工作生活

etcd分布式锁lease keepalive导致的gorout

2019-07-05  本文已影响0人  7亮

分布式锁实现

demo参考:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_mutex_test.go
基本原理:在etcd事务中查询key的revision是否为0,等于0则创建key和value,表示抢锁成功;不等于0则返回最早创建该key的revision信息,表示抢锁失败。更详细的实现留个TODO。

问题现象

问题分析

// keep the lease alive until client error or cancelled context <- 核心注释
go func() {
    defer close(donec)
    for range keepAlive {
        // eat messages until keep alive channel closes
    }
}()
func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
    select {
    case <-donec:
        return
    case <-l.donec:
        return
    case <-ctx.Done():
    }
    //略
}
func (m *Mutex) Unlock(ctx context.Context) error {
    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

unlock直接将key删除,可能会出现其他服务抢到锁,临界代码再次被执行。这样能解决当前问题,但会引入其他问题。

解决方案

unlock

lock expire

如何停止keepalive?分析源码

    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    s1, err := concurrency.NewSession(cli)  //<- 问题入口函数
    if err != nil {
        log.Fatal(err)
    }
    defer s1.Close()
    m1 := concurrency.NewMutex(s1, "/my-lock/")

    // acquire lock for s1
    if err := m1.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
    fmt.Println("acquired lock for s1")
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops)
    }
        
    //如果没有租约lease,则申请一个新的,其TTL是可以通过参数WithTTL设置,不设置默认defaultSessionTTL 60s。
    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = v3.LeaseID(resp.ID)
    }

    ctx, cancel := context.WithCancel(ops.ctx) 
    //KeepAlive参数ctx的parent来源于ops.ctx,通过此context可以cancel keepalive,停止keepalive channel。
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive { //<- 阻塞点之一,因为keepalive goroutine不停续租,没有close该channel
            // eat messages until keep alive channel closes
        }
    }()

    return s, nil
}

sessionOption对session做参数赋值操作,类似gRPC的client grpc.DialContext的代码。将结构字段赋值做成可变参数,做到针对性的定制化,并提供了WithTTL WithLease WithContext三种参数选项,分别设置新申请租约的TTL、设置已有租约、设置context。如果不熟悉context请先熟悉context的parent canel机制。

太累了,不写了。看一下goroutine执行图,有留言我再写。

lease keepalive执行图

etcd v3 lease keepalve.jpg
上一篇 下一篇

猜你喜欢

热点阅读