go goroutine 并发安全退出

2022-03-27  本文已影响0人  wayyyy

有时需要通知 goroutine 停止它正在干的事,go 并没有提供一个直接终止 的方法。因为这样会导致之间的共享变量处在未定义的状态。但是如果想要退出两个或者任意多个 goroutine 该怎么办呢?

go 语言中不同 goroutine 之间主要依靠通道进行通信和同步,要同时处理多个通道的发送和接收操作,需要使用select关键字。所以我们可以使用select实现的通道的超时判断。

通过 select 和 default 分支可以很容易实现一个 goroutine 的退出控制:

package main

import (
    "fmt"
    "time"
)

func worker(cannel chan bool) {
    for {
        select {
        default:
            time.Sleep(time.Millisecond * 500)
            fmt.Println("hello")
            // 正常工作
        case <-cannel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)
    go worker(cancel)

    time.Sleep(time.Second)
    cancel <- true

    time.Sleep(time.Second * 10000) // 保持main不退出
}

但是通道的发送和接收操作是一一对应的,如果停止多个 goroutine,那么可能需要创建同样数量的通道,这个代价太大了。其实我们可以通过close() 关闭一个通道来实现广播的效果,所有从关闭通道接收的操作均会收到一个零值和一个可选的失败标志。

package main

import (
    "fmt"
    "time"
)

func worker(i int, cannel chan bool) {
    for {
        select {
        default:
            time.Sleep(time.Millisecond * 500)
            fmt.Println(i, "hello")
            // 正常工作
        case <-cannel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)

    for i := 0; i < 10; i++ {
        go worker(i, cancel)
    }

    time.Sleep(time.Second)
    close(cancel)

    time.Sleep(time.Second * 10000) // 保持main不退出
}

上面的程序并不完善,因为main 退出通过sleep来的,这里加上使用sync.WaitGroup来完善实现main的安全退出。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(i int, cannel chan bool, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        default:
            time.Sleep(time.Millisecond * 500)
            fmt.Println(i, "hello")
            // 正常工作
        case <-cannel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(i, cancel, &wg)
    }

    time.Sleep(time.Second)
    close(cancel)

    wg.Wait()
}

标准库在1.7 新增了 context 包,用来简化对于处理单个请求的多个 goroutine 之间与请求域的数据,超时 和 退出等操作:

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func worker(i int, ctx context.Context, wg *sync.WaitGroup) error {
    defer wg.Done()

    for {
        select {
        default:
            time.Sleep(time.Millisecond * 500)
            fmt.Println(i, "hello")
            // 正常工作
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(i, ctx, &wg)
    }

    time.Sleep(time.Second)
    cancel()

    wg.Wait()
}

上一篇 下一篇

猜你喜欢

热点阅读