Go并发控制简明教程-WaitGroup和Context简明教程

2020-12-20  本文已影响0人  危地马拉的大坑

控制并发的两种方式

WaitGroup简单例子

使用WaitGroup可以把一个作业分包,使用多个协程完成,节省作业处理时间。

func main(){
    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        time.Sleep(2 * time.Second)
        fmt.Println("job 1 done.")
        wg.Done()
    }()

    go func() {
        time.Sleep(1 * time.Second)
        fmt.Println("job 2 done.")
        wg.Done()
    }()

    fmt.Println("Wait for job finish")
    wg.Wait()
}

声明一个WaitGroup,大小设置为2,表示有需要等待两个子协程完成。创建两个子协程,分别以睡眠代替作业负载,子协程结束前调用wg.Done()表示此任务已经完成。主协程在wg.Wait()时阻塞,等待子协程结束,countDown数为0时就继续执行。countDown数如何减少呢?通过wg.Done()完成的。

以下是输出:

~ » go run main.go
Wait for job finish
job 2 done.
job 1 done.

但是,我们会发现,创建一个子协程后,主协程无法控制子协程,只能等待,不能因为时间过长而发送信号通知子协程停止执行。

Channel + Selete控制协程

为了解决上面提到的问题,可以简单使用Channel + Selete来实现子协程执行时间过长后,主协程通知子协程结束返回的功能。

func main(){
    stop := make(chan bool)

    go func() {
        for {
            select {
            case <- stop:
                fmt.Println("job timeout return")
                return
            default:
                fmt.Println("job still working")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    stop <- true
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

创建一个布尔类型的Channel用于通知子协程是否停止。启动一个子协程模拟搞IO作业,里面有一个select,用于等待主协程的stop信号,如果没有此信号就执行default下面的语句。主协程5s后表示子协程执行超时,那么发送stop信号,子协程接收到信号后返回。主协程正常结束。

下面是执行日志:

~ » go run main.go
job still working
job still working
job still working
job still working
job still working
Timeout stop the job
job timeout return
Main goroutine finished!!

Context简单例子

上面的Channel + Selete简单例子只能解决一层主子协程的控制,如果子协程里也有子协程,那么此方法就无法奏效了。

不如下图,Worker2启动Job03协程工作,而Job03也要启动Job04协程配合工作。此时一旦Job04超时未返回,需要把此信号传递给Worker02,让它通知Job03和Job04停止工作,这种情况就很难解决了。

层级子协程

以此,Go引入了Context处理这种情况,Context是一种很好的设计模式。

下面使用Context改造Channel + Selete的例子:

func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <- ctx.Done():
                fmt.Println("job timeout return")
                return
            default:
                fmt.Println("job still working")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

改造点如下:

context.WithCancel方法用于创建一个可以发送取消信号的Context,它的入参需要一个父Context,此处使用了context.Background(),它正是根Context。调用cancel方法,会通过ctx.Done()通知子协程,然后使用select处理此信号。

Context控制多个子协程
func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go Work(ctx, "node1")
    go Work(ctx, "node2")
    go Work(ctx, "node3")

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

func Work(ctx context.Context, name string){
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

把子协程的方法抽取出来,我们尝试启动3个子协程,调用cancel()方法发送取消信号,所有子协程都停止了。

如下面打印日志所示:

~ » go run main.go
node2 job still working
node3 job still working
node1 job still working
node1 job still working
node3 job still working
node2 job still working
node3 job still working
node2 job still working
node1 job still working
node1 job still working
node2 job still working
node3 job still working
node3 job still working
node1 job still working
node2 job still working
Timeout stop the job
node2 job timeout return
node1 job timeout return
node3 job timeout return
Main goroutine finished!!

第二个例子,符合上面图的情况,主协程启动node1子协程,node1子协程启动node2子协程,那么只需要把Context传递下去,所有的子协程就能接受到主协程的取消信息,然后马上返回。

func main(){
    ctx, cancel := context.WithCancel(context.Background())

    go Work(ctx, "node1")

    time.Sleep(5 * time.Second)
    fmt.Println("Timeout stop the job")
    cancel()
    time.Sleep(5 * time.Second)
    fmt.Println("Main goroutine finished!!")
}

func Work(ctx context.Context, name string){
    go Work2(ctx, "node2")
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

func Work2(ctx context.Context, name string){
    for {
        select {
        case <- ctx.Done():
            fmt.Println(name, "job timeout return")
            return
        default:
            fmt.Println(name, "job still working")
            time.Sleep(1 * time.Second)
        }
    }
}

执行日志如下:

~ » go run main.go
node2 job still working
node1 job still working
node1 job still working
node2 job still working
node1 job still working
node2 job still working
node2 job still working
node1 job still working
node1 job still working
node2 job still working
Timeout stop the job
node2 job timeout return
node1 job timeout return
Main goroutine finished!!

晚安~

上一篇下一篇

猜你喜欢

热点阅读