Go sync.WaitGroup

2021-03-27  本文已影响0人  JunChow520

问题

main goroutine为了等待work goroutine都运行完毕,不得不在程序末尾使用time.Sleep()来休眠一段时间,等待work goroutine充分运行。

$ vim ./test/goroutine_test.go
package test

import (
    "fmt"
    "testing"
    "time"
)

func TestGoRoutine(t *testing.T) {
    for i := 0; i < 10; i++ {
        go fmt.Println(i)
    }
    time.Sleep(time.Second)
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN   TestGoRoutine
9
3
1
2
4
5
6
7
8
0
--- PASS: TestGoRoutine (1.00s)
PASS
ok      command-line-arguments  1.291s

但对于实际应用中,休眠1秒是完全不够的,同时大部分时间都无法预知for循环内代码运行时间的长短,此时就不能使用time.Sleep()来完成等待操作。

可以使用管道来完成上述操作

func TestGoRoutine(t *testing.T) {
    count := 10
    ch := make(chan bool, count)
    for i := 0; i < count; i++ {
        go func(i int) {
            fmt.Println(i)
            ch <- true
        }(i)
    }
    for i := 0; i < count; i++ {
        <-ch
    }
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN   TestGoRoutine
9
0
5
6
7
8
2
1
4
3
--- PASS: TestGoRoutine (0.00s)
PASS
ok      command-line-arguments  0.304s

使用管道可以达到目的,但有些大材小用,因为管道被设计出来不仅仅只是在这里做简单的同步处理的,因此这里使用管道实际上是不合适的。假如有上万、上十万、上百万的循环,也要申请同样数量大小的管道,对内存会是一个不小的开销。

对于这种情况,Golang中有一种工具sync.WaitGroup能更加方便地帮助达到目的。

sync.WaitGroup

Golang中除了使用Channel通道和Mutex互斥锁实现两个并发程序之间的同步外,还可以通过WaitGroup等待组实现多个任务的同步,WaitGroup可以保证在并发环境中完成指定数量的任务。

通俗来讲goroutine分为两类角色,一种gorouine作为一个worker小弟,老老实实的干活。另一种goroutine作为master管理者来监督小弟干活,当然master自身也是一个worker

当有很多worker干活时,master没事干歇着,但同时master又希望得到一个通知,了解所有worker们什么时候干完。

从程序开发角度来看,就是维护一个worker总数和一个channel,每个worker干完就向channel发送一个空messagemaster阻塞在channel的监听上,来一个message就说明有一个worker干完活了,记录下有多少messagemessageworker总数一致则说明全干完活。master就可以关闭channel,验收worker的工作成果。

计数器

WaitGroup内部拥有一个计数器,最初从0开始。

type WaitGroup struct{
  noCopy noCopy
  state1 [3]byte
}
WaitGroup

方法

WaitGroup拥有三个方法分别是Add()Done()Wait()用来控制计数器的数量

Wai't'Group
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()

WaitGroup.Done()WaitGroup.Add(-1)完全等价

func (wg *WaitGroup) Wait()

使用方法

  1. master goroutine通过调用WaitGroup.Add(delta int)来设置worker goroutine的个数,然后创建work goroutine
  2. worker goroutine执行结束后需调用WaitGroup.Done()
  3. master goroutine调用WaitGroup.Wait()且被block阻塞,直到所有的worker goroutine全部执行结束后返回。

例如:

$ vim ./test/sync_test.go
package test

import (
    "fmt"
    "sync"
    "testing"
)

func TestWaitGroup(t *testing.T) {
    count := 10
    //添加goroutine数量
    wg := sync.WaitGroup{}
    wg.Add(count)
    //循环模拟并发
    for i := 0; i < count; i++ {
        go func(i int) {
            fmt.Println(i)
            wg.Done() //设置gorooutine为-1
        }(i)
    }
    //执行main goroutine阻塞,直到所有WaitGroup数量为0。
    wg.Wait()
}
$ go test -v -run TestWaitGroup sync_test.go
=== RUN   TestWaitGroup
9
4
5
6
7
8
2
3
1
0
--- PASS: TestWaitGroup (0.00s)
PASS
ok      command-line-arguments  0.294s

注意

应用

需要一个用户的画像服务,当一个请求到来时需要

假如每个服务的响应时间是20ms到50ms,如果顺序调用服务读取数据不考虑数据整合消耗的时间,服务端整体的响应时间将会在100ms到250ms。先不说业务能不能接受,响应时间显然存在很大的优化空间。最直接的优化方向是取数逻辑总时间应该是单个服务最大消耗时间。

func TestTask(t *testing.T) {
    var wg sync.WaitGroup

    for _,task := range tasks{
        task := task
        wg.Add(1)

        go func(){
            defer wg.Done()
            task()
        }()
    }

    wg.Wait()
}

使用注意

task := task

由于Golang对切片遍历时runtime会将tasks[i]拷贝到task的内存地址中,下标i会变化,而task的内存地址是不会改变的。如果不做此次赋值操作,所有的goroutine可能读取到的都是最后一个task

例如:

func TestTask(t *testing.T) {
    tasks := []func(){
        func() { fmt.Printf("task1 ") },
        func() { fmt.Printf("task2 ") },
    }

    for index, task := range tasks {
        task()
        fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
    }
}
$ go test -v -run TestTask sync_test.go
=== RUN   TestTask
task1 0xc000006040 0xc00003c500
task2 0xc000006040 0xc00003c508
--- PASS: TestTask (0.00s)
PASS
ok      command-line-arguments  0.296s

执行结果说明

func TestTask(t *testing.T) {
    tasks := []func(){
        func() { fmt.Printf("task1 ") },
        func() { fmt.Printf("task2 ") },
    }

    for index, task := range tasks {
        task := task
        task()
        fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
    }
}
$ go test -v -run TestTask sync_test.go
=== RUN   TestTask
task1 0xc0000c0030 0xc0000884f0
task2 0xc0000c0038 0xc0000884f8
--- PASS: TestTask (0.00s)
PASS
ok      command-line-arguments  0.320s

执行结果说明

上一篇下一篇

猜你喜欢

热点阅读