6 - 协程机制

2020-07-04  本文已影响0人  天命_风流

协程

package groutine

import (
    "fmt"
    "testing"
    "time"
)

func SpendTime(inner func(i int)) func(i int){
    return func(i int) {
        s := time.Now()
        inner(i)
        fmt.Println("Spend time:", time.Since(s))
    }
}

func ForGroutine(t int){
    for i := 0 ; i < t ; i++{
        go func(i int) {  // 使用协程执行这个函数,这个函数需要一个 int 参数
            fmt.Println(i)
            time.Sleep(500 * time.Millisecond)
        }(i)  // 为这个函数传入参数
    }
    time.Sleep( 1000 * time.Millisecond)
}

func TestGroutine(t *testing.T){
    stForGroutine := SpendTime(ForGroutine)
    stForGroutine(10)
}
// 输出:0-10(乱序),Spend time: 1.002028259s

并发控制

package shareMem

import (
   "sync"
   "testing"
   "time"
)

func TestCounter(t *testing.T) {
   counter := 0
   for i := 0 ; i < 5000 ; i++{
       go func() {
           counter++
       }()
   }
   time.Sleep(time.Second * 1)
   t.Log("counter: ", counter)  // 4577
}

func TestCounterSafe(t *testing.T) {
   counter := 0
   var mut = sync.Mutex{}  // 一把锁

   for i := 0 ; i < 5000 ; i++{
       go func() {
           defer func() {
               mut.Unlock()  // 解锁
           }()
           mut.Lock()  // 上锁
           counter++
       }()
   }
   time.Sleep(time.Second * 1)
   t.Log("counter: ", counter)  // 5000
}
func TestCounterWait(t *testing.T) {
    counter := 0
    var mut = sync.Mutex{}  // 一把锁
    var wg = sync.WaitGroup{}  // 等待锁

    for i := 0 ; i < 5000 ; i++{
        wg.Add(1)  // 添加一个等待量
        go func() {
            defer func() {
                wg.Done()  // 释放一个等待量
                mut.Unlock()  // 解锁
            }()
            mut.Lock()  // 上锁
            counter++
        }()
    }
    wg.Wait()
    t.Log("counter: ", counter)  // 5000
}

CSP 并发机制

CSP 可以解决什么问题

你可以发现,之前的 ForGroutine 中没有返回值,所以我们可以直接使用协程,而不必在意它的执行结果。如果我们需要将执行结果返回,这就会出现协程通信的问题,而 CSP 可以解决它(我猜的)

  1. 左边的方式,一方交出信息,需要等待另一方接收(会阻塞)
  2. 右边的方式,创建了一个 buffer ,内容生产的人不会等待其他人接收,而是可以将所有内容一股脑地放进去
package csp

import (
    "fmt"
    "testing"
    "time"
)

func service() string{
    time.Sleep(time.Millisecond * 50)
    return "Srevice done"
}

func otherTask() {
    fmt.Println("work on something else")
    time.Sleep(time.Millisecond * 100)
    fmt.Println("other task is done.")
}

func TestService(t *testing.T)  {
    fmt.Println(service())
    otherTask()
    // 输出:
    // Srevice done
    // work on something else
    // other task is done.
}
func AsyncService() chan string{
    retCh := make(chan string)  // 在这里创建了一个 channel,这个 channel 可以放 string,注意,它是阻塞的
    go func() {
        ret := service()  // 执行,需要等待 50 ms
        fmt.Println("returned result")
        retCh <- ret  // 将执行结果放入 channel,由于没有人取,所以是阻塞的
        fmt.Println("service exited")
    }()
    return retCh
}

func TestAsynService(t *testing.T) {
    retCh := AsyncService()  // 后面的函数会直接返回一个 channel,此时这个 channel 是空的
    otherTask()  // 在这里会执行 100 ms,期间 channel 会放入 "Srevice done"
    fmt.Println(<- retCh)  // 从 channel 取出数据,并输出,在这之后,阻塞才会消失
    time.Sleep(time.Second * 1)
    // 执行结果:
    // work on something else
    // returned result
    // other task is done.
    // Srevice done
    // service exited
}
func AsyncService() chan string{
    retCh := make(chan string, 1)  // 在这里创建了一个 channel,它有 buffer,容量为 1,不阻塞
    go func() {
        ret := service()  // 执行,需要等待 50 ms
        fmt.Println("returned result")
        retCh <- ret  // 将执行结果放入 channel,不管有没有人取,会直接执行下一行代码
        fmt.Println("service exited")
    }()
    return retCh
}

func TestAsynService(t *testing.T) {
    retCh := AsyncService()  // channel 空
    otherTask()  // 在这里会执行 100 ms,期间 channel 会放入 "Srevice done"
    fmt.Println(<- retCh)
    time.Sleep(time.Second * 1)
    // 执行结果:
    // work on something else
    // returned result
    // service exited
    // other task is done.
    // Srevice done
}

select 的选择机制

package _select

import (
    "fmt"
    "testing"
    "time"
)

func service() string{
    time.Sleep(time.Millisecond * 50)
    return "Srevice done"
}

func otherTask() {
    fmt.Println("work on something else")
    time.Sleep(time.Millisecond * 100)
    fmt.Println("other task is done.")
}

func AsyncService() chan string{
    retCh := make(chan string, 1)
    go func() {
        ret := service()
        fmt.Println("returned result")
        retCh <- ret
        fmt.Println("service exited")
    }()
    return retCh
}

func TestSelect(t *testing.T) {
    select {
    case ret := <- AsyncService():
        t.Log(ret)
    case <- time.After(time.Millisecond * 100):
        t.Error("time out")
    }
}
// returned result
// service exited
// Srevice done

channel 的关闭

package cancel

import (
    "fmt"
    "testing"
    "time"
)

func isCancelled(cancelChan chan struct{}) bool {
    select {
    case <- cancelChan:
        return true
    default:
        return false
    }
}

func cancel_1(cancelChan chan struct{}){
    cancelChan <- struct{}{}
}

func cancel_2(cancelChan chan struct{}){
    close(cancelChan)
}

func TestCancel(t *testing.T){
    cancelChan := make(chan struct{}, 0)

    for i := 0; i < 5 ; i++{
        go func(i int, cancelCh chan struct{}) {
            for {
                if isCancelled(cancelCh){
                    break
                }
                time.Sleep(time.Millisecond * 5)
            }
            fmt.Println(i, "Cancelled")
        }(i, cancelChan)
    }

    //cancel_1(cancelChan)  // 使用这个方法,只能关闭一个接收 channel 的接收者
    cancel_2(cancelChan)  // 使用它可以关闭所有接收者
    time.Sleep(time.Second * 1)
}

更为成熟的协程管理:context

package context

import (
    "context"
    "fmt"
    "testing"
    "time"
)

func isCancelled(ctx context.Context) bool {
    select {
    case <- ctx.Done():
        return true
    default:
        fmt.Println("not cancel")
        return false
    }
}

func TestCancel(t *testing.T) {
    // context.Background() 为根 context
    // ctx 为子 context
    ctx, cancel := context.WithCancel(context.Background())  // ctx 包含了一个 channel,cancel 是一个函数,可以关闭这个 context
    for i := 0 ; i < 5 ; i++{
        go func(i int, ctx context.Context) {  // 创建 5 个协程
            for {  // 每个协程不断探查是否被取消
                if isCancelled(ctx) {  // 如果取消,则会跳出循环
                    break
                }
                time.Sleep(time.Millisecond * 5)
            }
            fmt.Println(i, "Cancelled")  // 取消后打印消息
        }(i, ctx)
    }
    cancel()  // 创建完 5 个协程之后,发送取消信息。此时 ctx.Done() 将会为所有子节点发送消息
    time.Sleep(time.Second * 1)
}
上一篇 下一篇

猜你喜欢

热点阅读