go waitgroup
2019-05-01 本文已影响0人
长安猎人
waitgroup等待一组grroutine完成
主goroutine调用Add来设置需要等待的goroutines数量,然后每个goroutine运行完成并在完成后调用Done。Wait用来阻止并等待所有goroutine完成。
waitgroup主要维护两个计数器,一个是请求计数器v,一个是等待计数器w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。
Add执行时候,请求计数器v+n,Done时v-1,v=0即使结束,触发wait。所谓触发wait是通过信号量实现。
wait支持并发,每次wait执行,w+1,而等待请求计数器v为0触发wait()时,要根据w的数量发送w份信号量,正确触发所有wait()
state返回指向wg.state1中存储的state和sema字段的指针。
wg.Add
添加将可能为负的delta添加到WaitGroup计数器。 如果计数器变为零,则释放在等待时阻止的所有goroutine。 如果计数器变为负数,则抛出panic异常。
备注:在计数器为0时发生的具有正增量的调用必须在wait前发生。具负增量的调用或者具有在计数器大于-时开始的正增量调用可以在任何时间发生。
意思是Add应该在创建要等待的goroutine或其他事件的语句之前执行。
如果重新使用waitgroup等待几个独立的事件集,则必须在返回由先前的wait调用之后发生新的Add调用。
wg.Wait
等待必须与第一个Add同步。 因此,只能为第一个wait写入,否则并发Waits将相互竞争。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg = sync.WaitGroup{}
var now = time.Now()
wg.Add(2)
go func() {
defer wg.Done()
time.Sleep(3 * time.Second)
}()
go func() {
defer wg.Done()
time.Sleep(5 * time.Second)
}()
wg.Wait()
d := time.Since(now) //等效于time.Now.Sub(now)
fmt.Println(d)
}
源码解读
package sync
import (
"internal/race"
"sync/atomic"
"unsafe"
)
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
//WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit
state1 [3]uint32
}
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
// wait不等于0说明已经执行了Wait,此时不容许Add
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
//信号量发出,触发wait结束
runtime_Semrelease(semap, false)
}
}
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
// 信号量来了,代表所有Add都已经Done
if *statep != 0 {
// 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}