2021/04/08谈谈Go中的sync.Mutex

2021-11-12  本文已影响0人  温岭夹糕

1.基本用法

基本用法

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mylock sync.Mutex
    var wg sync.WaitGroup
    count :=0
    for i:=0 ;i <10 ;i++ {
        wg.Add(1)
        go func (){
            defer  wg.Done()
            for i:=0;i<10000;i++ {
                mylock.Lock()
                count++
                mylock.Unlock()
            }
        }()
    }
    wg.Wait()
    fmt.Println(count)

}

通过使用锁的机制来达到对公共资源读写的原子操作控制

锁的粒度如何?

demo

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mylock sync.Mutex
    var wg sync.WaitGroup
    count :=0
    another_count := 0
    wg.Add(2)
    go func(){
        defer wg.Done()
        defer mylock.Unlock() //在释放锁前进行打印
        mylock.Lock()
        count++
        fmt.Println(count)
        time.Sleep(time.Second*2)
    }()
    go func(){
        defer wg.Done()
        defer mylock.Unlock() 
        mylock.Lock()
        another_count++
        fmt.Println( another_count )
        time.Sleep(time.Second*2)
    }()
    wg.Wait()

}

第一个协程对公共资源count进行加锁修改,第二个协程对公共资源another_count进行加锁修改,发现两协程先后返回,一共耗时4s,是否说明了锁的粒度类似mysql的表级锁,锁全部的公共资源呢

这里其实是错误的,因为在释放锁之前,我们使用了fmt的输出,涉及到了公共资源标准输出的占用
这也提醒了大量协程的打印输出存在数据的竞争

修改demo

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mylock sync.Mutex
    var wg sync.WaitGroup
    count :=0
    another_count := 0
    wg.Add(2)
    go func(){
        defer wg.Done()
        mylock.Lock()
        count++
        mylock.Unlock()
        time.Sleep(time.Second*2)
        fmt.Println(count) //在释放锁之后进行打印
    }()
    go func(){
        defer wg.Done()
        mylock.Lock()
        another_count++
        mylock.Unlock()
        time.Sleep(time.Second*2)
        fmt.Println( another_count )
    }()
    wg.Wait()
}

两协程同时返回,该实验也证明了Go中的锁更类似mysql的行锁,粒度是公共资源的单个变量

补充一点:这里不能使用defer进行fmt的打印,因为函数中有defer时,会首先将defer中需要的变量进行拷贝,等函数执行完毕再执行defer,因此使用defer进行打印结果只会输出原值0

Mutex也可以作为嵌入字段嵌入结构体

以下demo是保护结构体成员原子修改的demo

type MyData struct {
    sync.Mutex
    count int
}

func (d *MyData) Add() {
    d.Lock()
    d.count++
    d.Unlock()
}

func (d *MyData) Read() int {
    defer d.Unlock() //defer 在return后 或 panic前执行
    d.Lock()
    return d.count
}

func main() {
    var wg sync.WaitGroup
    mydata := MyData{count:0}
    for i:=0;i<10;i++ {
        wg.Add(1)
        go func(){
            defer wg.Done()
            for i:=0;i<10000;i++{
                mydata.Add()
            }
        }()
    }
    wg.Wait()
    fmt.Println( mydata.Read() )
}

当结构体有多字段,一般把Mutex放在要控制的字段上面(仅仅是美观,方便阅读,并无特殊作用)

多个协程同时等待释放锁,哪些会先获取到执行机会

等待的goroutine们是以FIFO排队的

  1. 当Mutex处于正常模式时,若此时没有新goroutine与队头goroutine竞争,则队头goroutine获得。若有新goroutine竞争大概率新goroutine获得。
  2. 当队头goroutine竞争锁失败1ms后,它会将Mutex调整为饥饿模式。进入饥饿模式后,锁的所有权会直接从解锁goroutine移交给队头goroutine,此时新来的goroutine直接放入队尾。
  3. 当一个goroutine获取锁后,如果发现自己满足下列条件中的任何一个#1它是队列中最后一个#2它等待锁的时间少于1ms,则将锁切换回正常模式

以上简略翻译自https://golang.org/src/sync/mutex.go 中注释Mutex fairness.

为什么Mutex只需要声明不需要初始化

我们尝试打印一下

func main() {
    var mylock sync.Mutex
    fmt.Printf("%v\n",mylock)

}
//结果为  {0,0}

是一个0值的结构体,我们再看一下源码中的结构

type Mutex struct {
        state int32
        sema  uint32
}

Mutex 的零值是还没有 goroutine 等待的未加锁的状态,所以你不需要额外的初始化,直接声明变量(如 var mu sync.Mutex)即可。
同时这也引出我们的下一个话题Mutex的演进过程

2.Mutex的演进过程

大致演进过程可总结为如下图 摘自极客时间go并发编程实战

2.1初版使用flag标记是否持有锁


   // CAS操作,当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    }    

其中cas是atomic包的前身,保证了内存地址的原子操作更新值
Mutex结构体

    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }

小结

初版的Mutex利用key判断是否被加锁,并记录多少协程需要(持有和等待获取)这个锁。但是从结构体中不难看出,并未记录持有这个锁的协程的信息,Unlock也没有检查是否是当前持有锁的协程释放锁(Mutex的这个设计一直保存至今)。
那么不就代表其他协程也可以释放锁了??

func main(){
    var wg sync.WaitGroup
    var lock sync.Mutex
    var count int
    wg.Add(2)
    go func(){
        defer wg.Done()
        lock.Lock()
        count++
        fmt.Println(count)
    }()
    go func(){
        defer wg.Done()
        time.Sleep(time.Second)
        lock.Unlock()
        count++
        fmt.Println(count)
    }()
    wg.Wait()
    lock.Unlock() //以下会抛出异常unlock of unlocked mutex
    lock.Unlock()
}

这是一件很危险的事情,因此我们使用锁需要遵循谁申请,谁释放的原则,在同一个方法中获取和释放锁

2.2新的协程也有竞争的机会

相比于初版,结构体发生了改变

   type Mutex struct {
        state int32
        sema  uint32
    }
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexWaiterShift = iota
    )

第一个字段不再仅代表是否持有锁,而是一个字段多个意义

state单字段分为三个字段,按位解析,最小位表示这个锁是否被持有,第二位表示唤醒的协程,第三位表示等待的协程数 image.png

剩余代码还没吃透(对二进制运算不够深刻),具体看鸟叔博客https://colobu.com/2018/12/18/dive-into-sync-mutex/

3.Mutex的错误使用场景

demo

type Counter struct {
    sync.Mutex
    Count int
}


func main() {
    var c Counter
    c.Lock()
    defer c.Unlock()
    c.Count++
    foo(c) // 复制锁
}

// 这里Counter的参数是通过复制的方式传入的
// Go函数的参数都是值赋值传递
func foo(c Counter) {
    c.Lock()
    defer c.Unlock()
    fmt.Println("in foo")
}

结果

fatal error: all goroutines are asleep - deadlock!

分析

  1. main函数加锁
  2. main调用foo,函数拷贝了其副本传递到函数体
  3. foo不知道已经上锁了,尝试用lock来获取锁(但是没有其他协程来释放这个赋值的锁),结果主协程被完全阻塞

利用vet工具检测

go vet demo.go 
# command-line-arguments
./demo.go:20:9: call of foo copies lock value: command-line-arguments.Counter
./demo.go:25:12: foo passes lock by value: command-line-arguments.Counter

提示我们foo函数发生了 锁的复制

重入,当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。

不同于JAVA,Go中的Mutex是不可重入锁,因为它并没有记录持有锁的协程信息,只是修改state的状态


func foo(l sync.Locker) {
    fmt.Println("in foo")
    l.Lock()
    bar(l)
    l.Unlock()
}


func bar(l sync.Locker) {
    l.Lock()
    fmt.Println("in bar")
    l.Unlock()
}


func main() {
    l := &sync.Mutex{}
    foo(l) //注意这里传递的是指针
}
上一篇 下一篇

猜你喜欢

热点阅读