golang

go waitgroup

2019-05-01  本文已影响0人  长安猎人

waitgroup等待一组grroutine完成

主goroutine调用Add来设置需要等待的goroutines数量,然后每个goroutine运行完成并在完成后调用Done。Wait用来阻止并等待所有goroutine完成。

waitgroup主要维护两个计数器,一个是请求计数器v,一个是等待计数器w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。

Add执行时候,请求计数器v+n,Done时v-1,v=0即使结束,触发wait。所谓触发wait是通过信号量实现。

wait支持并发,每次wait执行,w+1,而等待请求计数器v为0触发wait()时,要根据w的数量发送w份信号量,正确触发所有wait()

state返回指向wg.state1中存储的state和sema字段的指针。

wg.Add

添加将可能为负的delta添加到WaitGroup计数器。 如果计数器变为零,则释放在等待时阻止的所有goroutine。 如果计数器变为负数,则抛出panic异常。
备注:在计数器为0时发生的具有正增量的调用必须在wait前发生。具负增量的调用或者具有在计数器大于-时开始的正增量调用可以在任何时间发生。
意思是Add应该在创建要等待的goroutine或其他事件的语句之前执行。
如果重新使用waitgroup等待几个独立的事件集,则必须在返回由先前的wait调用之后发生新的Add调用。

wg.Wait
等待必须与第一个Add同步。 因此,只能为第一个wait写入,否则并发Waits将相互竞争。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg = sync.WaitGroup{}
    var now = time.Now()
    wg.Add(2)
    go func() {
        defer wg.Done()
        time.Sleep(3 * time.Second)
    }()

    go func() {
        defer wg.Done()
        time.Sleep(5 * time.Second)
    }()
    wg.Wait()
    d := time.Since(now) //等效于time.Now.Sub(now)
    fmt.Println(d)
}

源码解读

package sync

import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state, and the other 4 as storage
    // for the sema.
    //WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit
    state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if race.Enabled && delta > 0 && v == int32(delta) {
        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        // wait不等于0说明已经执行了Wait,此时不容许Add
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
        return
    }

    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0
    for ; w != 0; w-- {
        //信号量发出,触发wait结束
        runtime_Semrelease(semap, false)
    }
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        // 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                race.Write(unsafe.Pointer(semap))
            }
            runtime_Semacquire(semap)
            // 信号量来了,代表所有Add都已经Done
            if *statep != 0 {
                // 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读