golang学习篇章

go分布式锁

2021-01-17  本文已影响0人  Best博客

基于etcd 实现代码

package etcdlock

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "math/rand"
    "time"
)

type etcdMutex struct {
    client  *clientv3.Client
    key     string             //lock用到的key
    ttl     int64              //租约时间 默认10秒
    cancel  context.CancelFunc //关闭续租的func
    lease   clientv3.Lease
    leaseID clientv3.LeaseID
}

func NewEtcdMutex(client *clientv3.Client, key string) (em *etcdMutex, err error) {
    em = &etcdMutex{ttl: 10, client: client, key: key}
    var ctx context.Context
    em.lease = clientv3.NewLease(em.client)
    leaseResp, err := em.lease.Grant(context.TODO(), em.ttl)
    if err != nil {
        return
    }
    ctx, em.cancel = context.WithCancel(context.TODO())
    em.leaseID = leaseResp.ID
    _, err = em.lease.KeepAlive(ctx, em.leaseID)
    return
}

func (em *etcdMutex) Lock(ctx context.Context) (err error) {
    for {
        select {
        case <-ctx.Done():
            err = fmt.Errorf("获取锁超时")
            return
        default:
            if err := em.lock(); err != nil {
                rand.Seed(time.Now().UnixNano())
                s := time.Duration(rand.Intn(1500))
                s += 150
                continue
            }
            return
        }
    }
}

func (em *etcdMutex) UnLock() (err error) {
    em.cancel()
    _, err = em.lease.Revoke(context.TODO(), em.leaseID)
    return
}

func (em *etcdMutex) lock() error {
    //LOCK:
    txn := clientv3.NewKV(em.client).Txn(context.TODO())
    txn.If(clientv3.Compare(clientv3.CreateRevision(em.key), "=", 0)).
        Then(clientv3.OpPut(em.key, "", clientv3.WithLease(em.leaseID))).
        Else()
    txnResp, err := txn.Commit()
    if err != nil {
        return err
    }
    if !txnResp.Succeeded { //判断txn.if条件是否成立
        return fmt.Errorf("抢锁失败")
    }
    return nil
}

//etcd实现分布式锁要简单并且比redis靠谱一些,因为etcd RAFT分布式一致性协议
func main() {
    var conf = clientv3.Config{
        Endpoints:   []string{"localhost:2379"}, //etcd连接地址
        DialTimeout: 5 * time.Second,            //连接etcd超时时间
    }

    client, err := clientv3.New(conf)
    if err != nil {
        fmt.Println(err)
        return
    }
    //伪造20客户端抢锁
    for l := 0; l < 20; l++ {
        go func() {
            eMutex1, _ := NewEtcdMutex(client, "hugh122233132")
            ctx, _ := context.WithTimeout(context.TODO(), 10*time.Second)
            err = eMutex1.Lock(ctx)
            if err != nil {
                fmt.Println("lock 在一定时间内获取锁失败,这个时候你如果还需要继续抢就改为继续执行就是了")
                return
            }
            fmt.Println("抢到锁了")
            time.Sleep(time.Second * 1) //执行1秒业务逻辑
            err = eMutex1.UnLock()
            if err != nil {
                fmt.Println("主动释放etcd的锁失败 这个时候等etcd租约自动过期")
            }
        }()

    }

    time.Sleep(time.Second * 30)
}


基于redis 实现代码

https://github.com/go-redsync/redsync
这个项目已经封装了golang版本的red-lock
注意以下几点就行了:
1.red-lock貌似得你单独拎出1台以上的相互独立的redis出来,也就是它们间没有任务联系。这有点尴尬,所以redis去实现分布式锁你最好再想个兜底的。。。比如说用mysql多版本再原子性的控制一下(之所以说mysql是因为一般的项目都有mysql+redis)

  1. 这个包里面的lock并不像我们go源码里面的m :=sync.Mutex{};m.Lock() 它在尝试一定抢锁次数之后会返回err,业务里面记得根据err做对应处理。

3.如果各方面要求都很高就换一种方案吧,比如说 消息队列,但就又引入了新的东西。。。

package main

import (
    goredislib "github.com/go-redis/redis/v8"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
    // Create a pool with go-redis (or redigo) which is the pool redisync will
    // use while communicating with Redis. This can also be any pool that
    // implements the `redis.Pool` interface.
    client := goredislib.NewClient(&goredislib.Options{
        Addr: "localhost:6379",
    })
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

    // Create an instance of redisync to be used to obtain a mutual exclusion
    // lock.
    rs := redsync.New(pool)

    // Obtain a new mutex by using the same name for all instances wanting the
    // same lock.
    mutexname := "my-global-mutex"
    mutex := rs.NewMutex(mutexname)

    // Obtain a lock for our given mutex. After this is successful, no one else
    // can obtain the same lock (the same mutex name) until we unlock it.
    if err := mutex.Lock(); err != nil {
        panic(err)
    }

    // Do your work that requires the lock.

    // Release the lock so other processes or threads can obtain a lock.
    if ok, err := mutex.Unlock(); !ok || err != nil {
        panic("unlock failed")
    }
}

上一篇下一篇

猜你喜欢

热点阅读