6 - 协程机制
2020-07-04 本文已影响0人
天命_风流
协程
- Go 从底层就开始支持协程机制,这让它使用协程变得非常容易
- 下面的代码,使用 SpendTime 作为一个记时的装饰器
- 在 ForGroutine 中使用 go 关键字作为协程入口
package groutine
import (
"fmt"
"testing"
"time"
)
func SpendTime(inner func(i int)) func(i int){
return func(i int) {
s := time.Now()
inner(i)
fmt.Println("Spend time:", time.Since(s))
}
}
func ForGroutine(t int){
for i := 0 ; i < t ; i++{
go func(i int) { // 使用协程执行这个函数,这个函数需要一个 int 参数
fmt.Println(i)
time.Sleep(500 * time.Millisecond)
}(i) // 为这个函数传入参数
}
time.Sleep( 1000 * time.Millisecond)
}
func TestGroutine(t *testing.T){
stForGroutine := SpendTime(ForGroutine)
stForGroutine(10)
}
// 输出:0-10(乱序),Spend time: 1.002028259s
- 你会发现,Go 中使用协程很方便,远比 Python 中引入协程框架简单
并发控制
- 加锁(互斥锁):
package shareMem
import (
"sync"
"testing"
"time"
)
func TestCounter(t *testing.T) {
counter := 0
for i := 0 ; i < 5000 ; i++{
go func() {
counter++
}()
}
time.Sleep(time.Second * 1)
t.Log("counter: ", counter) // 4577
}
func TestCounterSafe(t *testing.T) {
counter := 0
var mut = sync.Mutex{} // 一把锁
for i := 0 ; i < 5000 ; i++{
go func() {
defer func() {
mut.Unlock() // 解锁
}()
mut.Lock() // 上锁
counter++
}()
}
time.Sleep(time.Second * 1)
t.Log("counter: ", counter) // 5000
}
- 等待(用于并发同步)
func TestCounterWait(t *testing.T) {
counter := 0
var mut = sync.Mutex{} // 一把锁
var wg = sync.WaitGroup{} // 等待锁
for i := 0 ; i < 5000 ; i++{
wg.Add(1) // 添加一个等待量
go func() {
defer func() {
wg.Done() // 释放一个等待量
mut.Unlock() // 解锁
}()
mut.Lock() // 上锁
counter++
}()
}
wg.Wait()
t.Log("counter: ", counter) // 5000
}
CSP 并发机制
- CSP 是 Go 中特有的并发机制,它的特性有点类似与 future
CSP 可以解决什么问题
你可以发现,之前的 ForGroutine 中没有返回值,所以我们可以直接使用协程,而不必在意它的执行结果。如果我们需要将执行结果返回,这就会出现协程通信的问题,而 CSP 可以解决它(我猜的)
-
channel 一个通信通道,它有两种方式:
channel 的两种方式
- 左边的方式,一方交出信息,需要等待另一方接收(会阻塞)
- 右边的方式,创建了一个 buffer ,内容生产的人不会等待其他人接收,而是可以将所有内容一股脑地放进去
- 下面是一个串行程序:
package csp
import (
"fmt"
"testing"
"time"
)
func service() string{
time.Sleep(time.Millisecond * 50)
return "Srevice done"
}
func otherTask() {
fmt.Println("work on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("other task is done.")
}
func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
// 输出:
// Srevice done
// work on something else
// other task is done.
}
- 下面使用了协程和 channel ( 无 buffer )
func AsyncService() chan string{
retCh := make(chan string) // 在这里创建了一个 channel,这个 channel 可以放 string,注意,它是阻塞的
go func() {
ret := service() // 执行,需要等待 50 ms
fmt.Println("returned result")
retCh <- ret // 将执行结果放入 channel,由于没有人取,所以是阻塞的
fmt.Println("service exited")
}()
return retCh
}
func TestAsynService(t *testing.T) {
retCh := AsyncService() // 后面的函数会直接返回一个 channel,此时这个 channel 是空的
otherTask() // 在这里会执行 100 ms,期间 channel 会放入 "Srevice done"
fmt.Println(<- retCh) // 从 channel 取出数据,并输出,在这之后,阻塞才会消失
time.Sleep(time.Second * 1)
// 执行结果:
// work on something else
// returned result
// other task is done.
// Srevice done
// service exited
}
- 下面使用协程和 channel (带 buffer,不阻塞)
func AsyncService() chan string{
retCh := make(chan string, 1) // 在这里创建了一个 channel,它有 buffer,容量为 1,不阻塞
go func() {
ret := service() // 执行,需要等待 50 ms
fmt.Println("returned result")
retCh <- ret // 将执行结果放入 channel,不管有没有人取,会直接执行下一行代码
fmt.Println("service exited")
}()
return retCh
}
func TestAsynService(t *testing.T) {
retCh := AsyncService() // channel 空
otherTask() // 在这里会执行 100 ms,期间 channel 会放入 "Srevice done"
fmt.Println(<- retCh)
time.Sleep(time.Second * 1)
// 执行结果:
// work on something else
// returned result
// service exited
// other task is done.
// Srevice done
}
- 注意 channel 和 buffer channel 的区别
select 的选择机制
- 使用 select 在不同 channel 之间进行选择
- 对不同的 channel,可以定制不同的行为
-
time.After( time.Second * 1 ) 会返回一个 channel,它常常用于超时控制
image.png
package _select
import (
"fmt"
"testing"
"time"
)
func service() string{
time.Sleep(time.Millisecond * 50)
return "Srevice done"
}
func otherTask() {
fmt.Println("work on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("other task is done.")
}
func AsyncService() chan string{
retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result")
retCh <- ret
fmt.Println("service exited")
}()
return retCh
}
func TestSelect(t *testing.T) {
select {
case ret := <- AsyncService():
t.Log(ret)
case <- time.After(time.Millisecond * 100):
t.Error("time out")
}
}
// returned result
// service exited
// Srevice done
channel 的关闭
- 使用 close( ch ) 可以立即关闭一个 channel
- 如果继续向 关闭的 channel 发送数据,会导致 panic
- 数据接收放可以使用 v, ok <- ch ,其中 ok 为 boll 值,true 表示正常,false 表示通道关闭
- 如果没有设置 ok ,则收到的 v 为零值
- 所有的 channel 接收者都会在 channel 关闭时,立刻从阻塞等待中 ok 的位置返回 false
- 使用上面的机制,可以设置 channel 的退出信号
- 使用这个机制,我们可以关闭一个任务:
package cancel
import (
"fmt"
"testing"
"time"
)
func isCancelled(cancelChan chan struct{}) bool {
select {
case <- cancelChan:
return true
default:
return false
}
}
func cancel_1(cancelChan chan struct{}){
cancelChan <- struct{}{}
}
func cancel_2(cancelChan chan struct{}){
close(cancelChan)
}
func TestCancel(t *testing.T){
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5 ; i++{
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh){
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
//cancel_1(cancelChan) // 使用这个方法,只能关闭一个接收 channel 的接收者
cancel_2(cancelChan) // 使用它可以关闭所有接收者
time.Sleep(time.Second * 1)
}
更为成熟的协程管理:context
- context 包提供了很多管理
- 使用下面的代码,可以实现和上面的代码同样的功能
package context
import (
"context"
"fmt"
"testing"
"time"
)
func isCancelled(ctx context.Context) bool {
select {
case <- ctx.Done():
return true
default:
fmt.Println("not cancel")
return false
}
}
func TestCancel(t *testing.T) {
// context.Background() 为根 context
// ctx 为子 context
ctx, cancel := context.WithCancel(context.Background()) // ctx 包含了一个 channel,cancel 是一个函数,可以关闭这个 context
for i := 0 ; i < 5 ; i++{
go func(i int, ctx context.Context) { // 创建 5 个协程
for { // 每个协程不断探查是否被取消
if isCancelled(ctx) { // 如果取消,则会跳出循环
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled") // 取消后打印消息
}(i, ctx)
}
cancel() // 创建完 5 个协程之后,发送取消信息。此时 ctx.Done() 将会为所有子节点发送消息
time.Sleep(time.Second * 1)
}
- 上面只是一个小例子,具体内容需要自己深入探索