go语言

Go-同步原语与锁(二)WaitGroup、Once与Cond

2019-09-25  本文已影响0人  链人成长chainerup

本文将讲解一下Go语言中的同步原语与锁。会阐述几种常见的锁,剖析其流程,然后针对每种同步原语举几个例子。由于文章比较长,为方便阅读,将这部分拆解为两部分。本文是第二部分 WaitGroup、Once与Cond。

环境: go version go1.8.7 darwin/amd64

1 WaitGroup

1.1 结构

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state.
    state1 [12]byte
    sema   uint32
}

如源码注释中所说,WaitGroup 就是要等一堆goroutines结束。主routine 利用wg的Add设置一个数量去等待,然后每个routine 执行,执行完调用wg的Done函数。同时,Wait在所有routine没有执行完之前,一直在等待。
其主要的函数就是Add, Done, Wait。

1.2 流程

1.2.1 Add与Done

Add 方法的主要作用就是更新 WaitGroup 中持有的计数器 counter。开始设置的是大于0的数量,当每次调用Done的时候,底层就是调用的Add方法,对计数器进行减一操作。当调用 Add 方法导致计数器归零并且还有等待的 Goroutine 时,就会通过 runtime_Semrelease 唤醒处于等待状态的所有 Goroutine

1.2.2 Wait

另一个 WaitGroup 的方法 Wait 就会在当前计数器中保存的数据大于 0 时修改等待 Goroutine 的个数 waiter 并调用 runtime_Semacquire 陷入睡眠状态。

1.3 例子

func TestMultiRoutineWaitGroup(t *testing.T) {
    wg := sync.WaitGroup{}
    wgCount := 10
    wg.Add(wgCount)
    for i := 0; i < wgCount; i++ {
        go func(i int) {
            fmt.Println(i, " in processing")
            wg.Done()
            time.Sleep(time.Millisecond * 1000)
            // 在子routine 中也等待所有的routine结束才执行下面的输出。
            wg.Wait()
            fmt.Println("all done, ", i, " ended!")
        }(i)
    }
    wg.Wait()
    time.Sleep(time.Second)
    fmt.Println("all done")
}

结果:

9  in processing
0  in processing
4  in processing
1  in processing
2  in processing
3  in processing
5  in processing
8  in processing
7  in processing
6  in processing
all done,  2  ended!
all done
all done,  7  ended!
all done,  8  ended!
all done,  0  ended!
all done,  3  ended!
all done,  9  ended!
all done,  5  ended!
all done,  4  ended!
all done,  1  ended!
all done,  6  ended!

2 Once

保证在 Go 程序运行期间 Once 对应的某段代码只会执行一次。

2.1 结构

type Once struct {
    m    Mutex
    done uint32
}

其结构里面包含了一个互斥锁。

2.2 流程

其核心流程就是一个Do函数,Do的操作就是先获取互斥锁,获取到了才执行。

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }
    // Slow-path.
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

2.3 例子

func TestOnce(t *testing.T) {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Println("routine: ", i)
            o.Do(func() {
                // 多试几次,发现并不是一定是第0个 routine 执行。
                fmt.Println(i , " dided")
            })
        }(i)
    }
}

结果:

routine:  1
1  dided
routine:  0
routine:  2
routine:  7
routine:  6
routine:  4
routine:  5
routine:  8
routine:  9
routine:  3

3 Cond

Cond 其实是一个条件变量,通过 Cond 可以让一系列的 Goroutine 都在触发某个事件或者条件时才被唤醒,每一个 Cond 结构体都包含一个互斥锁 L

3.1 结构

type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker 

    notify  notifyList // 需要被通知的routine
    checker copyChecker
}

3.2 流程

Cond中,有一堆routine处于Wait状态,有些routine处于通知的状态。当处于Wait状态的routine收到通知才能继续执行。

3.2.1 Wait

Wait 方法会将当前 Goroutine 陷入休眠状态,它会先调用 runtime_notifyListAdd 将等待计数器 +1,然后解锁并调用 runtime_notifyListWait 等待其他 Goroutine 的唤醒

3.2.2 Broadcast

Broadcast 会唤醒队列中全部的 routine。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
3.2.3 Signal

Signal 唤醒队列最前面的 routine。

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

3.3 例子

func TestCond(t *testing.T) {
    c := sync.NewCond(&sync.Mutex{})
    for i := 0; i < 10; i++ {
        go listen(c, i)
    }
    time.Sleep(1*time.Second)

    // 唤醒等待队列第一个routine ,等待最久的routine
    go signal(c)

    // 唤醒所有
    go broadcast(c)

    time.Sleep(time.Second * 5)

}

func signal(c *sync.Cond) {
    c.L.Lock()
    fmt.Println("before signal===========")
    // 仅仅会唤醒休眠队列前面的routine
    c.Signal()
    fmt.Println("after signal===========")
    c.L.Unlock()
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    fmt.Println("before broadcast===========")
    // 给所有wait的routine发送信号
    c.Broadcast()
    fmt.Println("after broadcast===========")

    c.L.Unlock()
}

func listen(c *sync.Cond, i int) {
    c.L.Lock()
    // 等待信号
    c.Wait()
    fmt.Println( "listen: ", i)
    c.L.Unlock()
    fmt.Println("after listen: ", i)
}

结果:

before signal===========
after signal===========
before broadcast===========
after broadcast===========
listen:  7
after listen:  7
listen:  9
after listen:  9
listen:  8
after listen:  8
listen:  3
after listen:  3
listen:  4
after listen:  4
listen:  6
after listen:  6
listen:  5
after listen:  5
listen:  2
after listen:  2
listen:  0
after listen:  0
listen:  1
after listen:  1

4 总结

本文是《同步原语与锁》的第二部分-WaitGroup, Once, Cond。从结构、流程、demo 三个维度进行了分析,希望对你有所帮助~

5 参考文献

同步原语与锁 https://draveness.me/golang/concurrency/golang-sync-primitives.html
Go 1.8 源码

6 其他

本文是《循序渐进go语言》的第十四篇-《Go-同步原语与锁(二)WaitGroup、Once与Cond》。
如果有疑问,可以直接留言,也可以关注公众号 “链人成长chainerup” 提问留言,或者加入知识星球“链人成长” 与我深度链接~

上一篇下一篇

猜你喜欢

热点阅读