Golang干货

Go语言学习笔记七(并发编程)

2019-08-10  本文已影响0人  Jabir_Zhang

协程机制

Golang 线程和协程的区别

备注:需要区分进程、线程(内核级线程)、协程(用户级线程)三个概念。

进程、线程 和 协程 之间概念的区别

对于进程、线程,都是有内核进行调度,有CPU时间片的概念,进行抢占式调度(有多种调度算法)

对于协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的,因为是由用户程序自己控制,那么就很难像抢占式调度那样做到强制的 CPU 控制权切换到其他进程/线程,通常只能进行协作式调度,需要协程自己主动把控制权转让出去之后,其他协程才能被执行到。

goroutine 和协程区别

本质上,goroutine 就是协程。 不同的是,Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的CPU (P) 转让出去,让其他 goroutine 能被调度并执行,也就是 Golang 从语言层面支持了协程。Golang 的一大特色就是从语言层面原生支持协程,在函数或者方法前面加 go关键字就可创建一个协程。

func TestGroutine(t *testing.T) {
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Println(i)
        }(i)
    }
    time.Sleep(time.Millisecond * 50)
}

这里面额外在内部func里传了i,注意这里如果不传i,直接用的话是不可行的,因为使用go创建协程,但是需要注意的是:协程函数的par作为参数是外部i的数据拷贝。

其他方面的比较
  1. 内存消耗方面
  1. 线程和 goroutine 切换调度开销方面

共享内存并发机制

先来看这样一段代码

func TestCounter(t *testing.T) {
    counter := 0
    for i := 0; i < 5000; i++ {
        go func() {
            counter++
        }()
    }
    time.Sleep(1 * time.Second)
    t.Logf("counter = %d", counter)

}

正常来说counter应该最后的结果是5000,但是实际结果每次都不相同且都小于5000。其中存在两个问题,第一个协程不安全,需要对协程加锁,因为每个协程都在修改counter;第二个因为是异步,主协程并不会等待所有子协程的结束,因此不能保证打印结果的时候,counter加到5000。
那如何保证最后的结果输出是5000呢?可以做以下两个操作

Lock(锁)

很明显保证线程安全就需要锁,看一下Go的锁如何使用:

func TestCounterThreadSafe(t *testing.T) {
    var mut sync.Mutex
    counter := 0
    for i := 0; i < 5000; i++ {
        go func() {
            defer func() {
                mut.Unlock()
            }()
            mut.Lock()
            counter++
        }()
    }
    time.Sleep(1 * time.Second)
    t.Logf("counter = %d", counter)

}
WaitGroup

多个协程等待全部执行完毕,可以用WaitGroup:

func TestCounterWaitGroup(t *testing.T) {
    var mut sync.Mutex
    var wg sync.WaitGroup
    counter := 0
    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer func() {
                mut.Unlock()
            }()
            mut.Lock()
            counter++
            wg.Done()
        }()
    }
    wg.Wait()
    t.Logf("counter = %d", counter)

}

CSP并发机制

CSP vs. Actor

传统的并发模型主要分为 Actor 模型和CSP模型,CSP模型全称为 communicating sequential processes,CSP 模型由并发执行实体(进程,线程或协程),和消息通道组成,实体之间通过消息通道发送消息进行通信。

channel分为两种:unbuffered channelbuffered channel
unbuffered channel
如图unbuffered channel中,通信双方同时都在Channel上,否则协程会阻塞,直到双方完成通信。
buffered channel
如图buffered channel中,我们给channel设置一个容量,只要容量未满,发送消息者就可以往里面放消息,相反对于接收方,只要里面有消息,他就可以取。放容量满了以后,发送方只能等接收方取走一条消息后才可以发送,接收方也只能等里面有消息后才会去取。
channel 的基本操作

先讲一下channel的基本操作:

//创建channel
ch := make(chan int)

// 写入channel
ch <- x

// 从channel读取
x <- ch

// another way to read
x = <- ch

//关闭channel
close(ch)

channel 一定要初始化后才能进行读写操作,否则会永久阻塞。

func service() string {
    time.Sleep(time.Millisecond * 50)
    return "Done"
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Millisecond * 100)
    fmt.Println("Task is done.")
}

func AsyncService() chan string {
    retCh := make(chan string)
    go func() {
        ret := service()
        fmt.Println("returned result.")
        retCh <- ret
        fmt.Println("service exited.")
    }()
    return retCh
}

func TestAsynService(t *testing.T) {
    fmt.Println(service())
    otherTask()
    retCh := AsyncService()
    otherTask()
    fmt.Println(<-retCh)
    time.Sleep(time.Second * 1)
}
=======打印结果=======
Done
working on something else
Task is done.
working on something else
returned result.
Task is done.
Done
service exited.

TestAsynService方法中前两行是串行的,大家可见打印结果即使有延时,也是按顺序打印。AsyncService的方法返回了一个channel,此channel是unbuffered channel,因此会协程阻塞,因此打印“service exited”会一直等到打印“Done”之后。

func AsyncService() chan string {
    retCh := make(chan string, 1)
    go func() {
        ret := service()
        fmt.Println("returned result.")
        retCh <- ret
        fmt.Println("service exited.")
    }()
    return retCh
}
=======打印结果=======
Done
working on something else
Task is done.
working on something else
returned result.
service exited.
Task is done.
Done

我把AsyncService里面初始化的channel变为buffered channel,可见打印“service exited.”就不会被阻塞,打印在了“Done”前。

channel的关闭

有关 channel 的关闭,你需要注意以下事项:

func dataProducer(ch chan int, wg *sync.WaitGroup) {
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        close(ch)
        wg.Done()
    }()
}

func dataReceiver(ch chan int, wg *sync.WaitGroup) {
    go func() {
        for {
            if data, ok := <-ch; ok {
                fmt.Println(data)
            } else {
                break
            }
        }
        wg.Done()
    }()
}

func TestCloseChannel(t *testing.T) {
    var wg sync.WaitGroup
    ch := make(chan int)
    wg.Add(1)
    dataProducer(ch, &wg)
    wg.Add(1)
    dataReceiver(ch, &wg)
    wg.Wait()
}

多路选择和超时

select是go语言中常用的一个关键字,官方解释:select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。

多路选择

对于select的理解有以下几点:

//channel
func AsyncService() chan string {
    retCh := make(chan string)
    go func() {
        time.Sleep(time.Millisecond * 50)
        retCh <- "Done"
    }()
    return retCh
}

//测试select含有default
func TestSelectDefault(t *testing.T) {
    select {
    case ret := <- AsyncService():
        t.Logf("result: %s", ret)
    default:
        t.Error("No one returned")
    }
}
---------------------------------------------------------
打印结果:No one returned
---------------------------------------------------------

//测试select不含有default
func TestSelect(t *testing.T) {
    select {
    case ret := <- AsyncService():
        t.Logf("result: %s", ret)
    }
}
---------------------------------------------------------
打印结果:result: Done
---------------------------------------------------------

打印结果发现有default子句,执行default;如果没有default子句,select将阻塞,直到channel返回值。

超时

select可以设置超时,具体代码如下

func AsyncService() chan string {
    retCh := make(chan string)
    go func() {
        time.Sleep(time.Millisecond * 500)
        retCh <- "Done"
    }()
    return retCh
}

func TestTimeOut(t *testing.T) {
    select {
    case ret := <- AsyncService():
        t.Logf("result: %s", ret)
    case <- time.After(time.Millisecond * 100):
        t.Error("time out")
    }
}
---------------------------------------------------------
打印结果:time out
---------------------------------------------------------

因为channel延迟了500毫秒,因此超时了,所以走到了超时的case中;如果把上面延迟500毫秒改成50毫秒,则正常走到了打印channel的case中。
另外,select是可以使用break,case中使用了break后,走到此case中执行到break后就不执行break之后的代码。

Context 与任务取消

树状图
如何取消中间任务handle(Req1),并且同时取消子任务,这时可以用到Context
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}

func TestCancel(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < 5; i++ {
        go func(i int, ctx context.Context) {
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5)
            }
            fmt.Println(i, "Cancelled")
        }(i, ctx)
    }
    cancel()
    time.Sleep(time.Second * 1)
}
---------------------------------------------------------
1 Cancelled
4 Cancelled
3 Cancelled
2 Cancelled
0 Cancelled
---------------------------------------------------------

参考

Golang 之协程详解
由浅入深剖析 go channel

上一篇下一篇

猜你喜欢

热点阅读