golang - sync.WaitGroup

2022-11-12  本文已影响0人  husky_1

go 版本基于1.18

结构体

结构体定义如下:

type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers only guarantee that 64-bit fields are 32-bit aligned.
    // For this reason on 32 bit architectures we need to check in state()
    // if state1 is aligned or not, and dynamically "swap" the field order if
    // needed.
    state1 uint64
    state2 uint32
}

当我们初始化一个WaitGroup对象时,其counter值、waiter值、semap值均为0

方法

1. Add

func (wg *WaitGroup) Add(delta int) {
        // 获取计数器和信号量
    statep, semap := wg.state()
        // 竞争检测相关,与功能无关,忽略
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
        // 计数值加上 delta: statep 的前四个字节是计数值,因此将 delta 前移 32位
    state := atomic.AddUint64(statep, uint64(delta)<<32)
        // 当前的counter计数值
    v := int32(state >> 32)
        // 当前的waiter 计数值
    w := uint32(state)
        // 竞争检测,忽略
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
        // counter 计数值<0 , 曝panic 异常
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    // delta > 0 && v == int32(delta) : 表示从 0 开始添加计数值
   // w!=0 :表示已经有了等待者
   // 说明在添加counter计数值的时候,同时添加了等待者,非法操作。添加等待者需要在添加计数值之后
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
         // v>0 : 计数值不等于0,不需要唤醒等待者,直接返回
         // w==0: 没有等待者,不需要唤醒,直接返回
    if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
          // 再次检查数据是否一致
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }

     // 到这里说明计数值为0,且等待者大于0,需要唤醒所有的等待者,并把系统置为初始状态(0状态)
  // 将计数值和等待者数量都置为0
    *statep = 0
          // 唤醒等待者
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

2. Done

func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

完成一个任务,将计数值减一,当计数值减为0时,需要唤醒所有的等待者

3.Wait

func (wg *WaitGroup) Wait() {
        // 获取计数器和信号量
    statep, semap := wg.state()
        // 竞争检测,忽略
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
        
    for {
                // 原子操作,获取计数器值
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
                // 所有任务都完成了,counter =0,此时直接退出,即不阻塞
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // waiter 计数器加一
                // 这里会有竞争,比如多个 Wait 调用,或者在同时调用 Add 方法,增加不成功会继续 for 循环
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
                        //   // 增加成功后,阻塞在信号量这里,等待被唤醒
            runtime_Semacquire(semap)
                         // 被唤醒的时候,计数器应该是0状态。如果重用 WaitGroup,需要等 Wait 返回
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

注意事项

使用示例

package main

import (
    "sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.example.com/",
    }
    for _, url := range urls {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            // Decrement the counter when the goroutine completes.
            defer wg.Done()
            // Fetch the URL.
            http.Get(url)
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

上一篇 下一篇

猜你喜欢

热点阅读