go 的并发机制
并发与并行的概念
① 多线程程序在单核cpu上运行就是并发;
② 多线程程序在多核cpu上运行就是并行;
goroutine 特点
- 它是一个协程,是一个轻量级的线程。
-
非抢占式多任务
处理,由协程主动交出控制权。(下面会解释) - 编译器/解释器/虚拟机层面的多任务。
- 多个协程可能运行在多个线程之上,这个是由调度器去决定的。
协程是非抢占式的,由协程主动交出控制权,而线程是抢占式的,也就是由操作系统主动停掉一个线程让给其他线程执行。
非抢占式多任务
非抢占式的含义就是其他任务不会抢我的cpu,而是等待我主动让出去。别的goroutine才可以执行。而io等操作会主动让出去goroutine的执行权。
func main() {
var a [10]int
for i := 0;i< 10;i++{
go func(i int) {
for {
a[i]++
}
}(i)
}
time.Sleep(time.Second)
fmt.Println(a)
}
这段代码可能让人感觉 1s后就会退出阿,因为main函数会退出阿。其实不会的。这个程序会卡住。而其实main函数也是一个goroutine。而程序中对一个变量++ 是不会交出控制权的(io操作可以交出控制权,如fmt.println),所以main也得不到运行了。。可以看到这段程序死机了(cpu使用率400%,我是4核机器)
进程 USER PR NI VIRT RES SHR %CPU %MEM TIME+ COMMAND
20990 kiosk 20 0 102708 1440 1052 S 390.2 0.0 0:36.46 /tmp/___go_build_main_go
为了手动交出控制权,可以添加一行 runtime.Gosched()
普通函数是协程的一个特例。因为协程中,main和dowork是可以相互交互的。
调度器会在合适的点进行切换,不需要人来介入。
goroutine的可能切换点
I/O select
channel
函数调用
runtime.Gosched()
等待锁
其他
多个协程可能运行于多个线程之上。
还是以刚才的例子。观察goroutine 运行占cpu 362。他开启了6个线程,但实质上只运行了4个(看下图的pidstat输出),因为我的机器是4核。他运行在4核CPU上。go的调度器是很智能的。
pidstat -t -p 21312 1
Linux 4.15.0-50-generic (Prometheus) 2019年06月05日 _x86_64_ (4 CPU)
23时34分45秒 UID TGID TID %usr %system %guest %wait %CPU CPU Command
23时34分46秒 1000 21312 - 100.00 1.00 0.00 0.00 100.00 3 ___go_build_mai
23时34分46秒 1000 - 21312 0.00 0.00 0.00 0.00 0.00 3 |_____go_build_mai
23时34分46秒 1000 - 21313 0.00 0.00 0.00 0.00 0.00 2 |_____go_build_mai
23时34分46秒 1000 - 21314 98.00 0.00 0.00 2.00 98.00 2 |_____go_build_mai
23时34分46秒 1000 - 21315 100.00 0.00 0.00 0.00 100.00 1 |_____go_build_mai
23时34分46秒 1000 - 21316 96.00 0.00 0.00 4.00 96.00 3 |_____go_build_mai
23时34分46秒 1000 - 21317 98.00 0.00 0.00 2.00 98.00 0 |_____go_build_mai
M,P,G
Go 的调度器内部有三个十分重要的结构,M,P,G。(M>P 如上个例子的 6 个M,4个P)
-
M
表示真正的内核OS线程,和POSIX里的thread差不多,真正干活的人。 -
P
表示调度等上下文,可以把他看做一个局部的调度器,使go代码在一个线程上跑,它是实现从N:1 (多个用户线程在一个内核线程上跑)到 N:M 映射的关键。 -
G
代表一个 goroutine,它有自己的栈,用于调度。
M,P,G
上图表示有两个物理线程M,每个M都拥有一个context(P),每一个P上又拥有一个正在运行的G和很多等待运行的G。
P 的总数量可以通过 GOMAXPROCS() 设置。它表示真正的并发量,即有多少个goroutine可以同时运行。
上面等待的(灰色)goroutine处于ready的就绪态。而每个P都维护着一个队列(runqueue)
当一个M(线程)阻塞了,P(调度器)可以转而投奔另一个OS线程,当一个OS线程M0 阻塞,P转而在OS线程M1 运行。调度器保证有足够的线程来运行所有的P。(如之前看到的4个P,6个M)
当M0返回时,它必须尝试取得一个 context P 调度器,一般情况下,会从其他的OS 线程上偷steal一个P过来。
如果没有偷到的话,他就把goroutine放到global runqueue 中,自己睡眠(放回线程池)。P 也会周期性检查global runqueue。
M,P,G
另一种情况就是某一个P所分配的任务G很快被执行完了(分配不均),这就导致一个context P 闲着,如果 global runqueue 上没有 G 了,那么它会偷其他P 的G。一般偷的话会偷一半。确保每个OS线程都能得到充分的使用。
这段参考知乎 Golang 的 goroutine 是如何实现的? Yi Wang 的回答
CSP 模型 channel
看下面的一段代码。这里的使用函数式编程,以一个函数创建一个通道。
func CreateWorker(id int) chan<- int { // 返回一个只允许往里送数据的chan。
c := make(chan int)
go func() {
for {
n, ok := <-c // 自己在goroutine 里收数据
if !ok {break}
fmt.Printf("Worker %d received %d \n",id,n)
}
}()
return c
}
func main() {
var c [5]chan<- int
for i := 0;i < 5; i++{
c[i] = CreateWorker(i) //创建5个没有buffer的通道 返回值是只允许发数据的chan
}
for j := 0;j < 5; j++ {
c[j] <- j // 通道里写值来确保一个任务结束
close(c[j]) // close 不是说必须的, 但是关闭通道的最好是发送方!!!
}
}
执行结果:
Worker 0 received 0
Worker 1 received 1
Worker 2 received 2
Worker 3 received 3
这个函数有个问题啊。为什么打印出的是4个不是5个呢,在最后一个打印时,gorounting结束了,但是最后一个还没来得及打印。
如下改造, 加一个 done,当 done 中的数据被取出来后,打印的动作肯定也就完成了。
type Worker struct {
In chan int
Done chan bool
}
func doWorker(id int,w Worker) {
for n := range w.In {
fmt.Printf("Worker %d received %c \n",id,n) // 先打印再down
w.Done <- true
}
}
func CreateWorker(id int) Worker {
w := Worker{
In: make(chan int),
Done: make(chan bool),
}
go doWorker(id,w)
return w
}
func main() {
var workers [5]Worker
for i := 0;i < 5; i++{
workers[i] = CreateWorker(i)
}
for j := 0;j < 5; j++ {
workers[j].In <- 'a'+j
<- workers[j].Done // 将数据送进去之后,等待work打印完成(等待down)
}
}
执行结果
Worker 0 received a
Worker 1 received b
Worker 2 received c
Worker 3 received d
Worker 4 received e
close 了的channel还可以接受数据吗?
通道被关闭,是还可以接着收数据的。如下面的代码 (呼应了通道的关闭最好是发送方!!)
close 了的channel 关闭了就不能再发送数据了,这里就不做解释了。
func worker(c chan int) {
for {
fmt.Printf("Worker recived %d\n",<-c)
}
}
func main() {
c := make(chan int)
go worker(c)
c <- 'a'
c <- 'b'
c <- 'd'
close(c)
time.Sleep(50 * time.Microsecond)
}
执行结果
Worker recived 97
Worker recived 98
Worker recived 100
Worker recived 0
Worker recived 0
Worker recived 0
Worker recived 0
....
当一个channel 被关闭了,就会一直收到 0。怎么避免呢?
- 方法一: ok-parten 模式
所有的 channel 接收者都会在 channel 关闭时,立立刻从阻塞等待中返回且 ok 值为 false。
func worker( c chan int) {
for {
if n,ok := <-c ;ok {
fmt.Printf("Worker recived %d\n",n)
}
}
}
- 方法二 :range 模式
func worker( c chan int) {
for n := range c{
fmt.Printf("Worker recived %d\n",n)
}
}
打印怎么是顺序的?
这里 还有一个问题,打印的数据是按顺序的,这和直接按顺序打印没有区别了。(因为这里每打印一次就要等待一个down,down了才能开始下一次)
将done 和 打印分开就可以了
type Worker struct {
In chan int
Done chan bool
}
func doWorker(id int,w Worker) {
for n := range w.In {
fmt.Printf("Worker %d received %c \n",id,n) // 先打印再down
go func(w Worker) {
w.Done <- true
}(w)
}
}
func CreateWorker(id int) Worker {
w := Worker{
In: make(chan int),
Done: make(chan bool),
}
go doWorker(id,w)
return w
}
func main() {
var workers [20]Worker
for i := 0;i < 20; i++{
workers[i] = CreateWorker(i)
}
for i,worker := range workers {
worker.In <- 'a'+i // 专心往里送数据,然后打印
}
for _,worker := range workers {
<- worker.Done // 等待完成这件事放到最后面
}
}
执行结果:
Worker 0 received a
Worker 3 received d
Worker 5 received f
Worker 2 received c
Worker 1 received b
Worker 4 received e
Worker 6 received g
...
上述的方法还是不够优雅,看下面的
WaitGroup 去并发任务(确保任务都执行)
引入WaitGroup,当所有的任务都完成才退出。还有waitgroup确保一个任务被执行。
type Worker struct {
In chan int
Done func() // 函数式编程,Done 去调用 wg.Done
}
func doWorker(id int,w Worker) {
for n := range w.In {
fmt.Printf("Worker %d received %c \n",id,n) // 先打印再down
w.Done()
}
}
func CreateWorker(id int,wg *sync.WaitGroup) Worker { // 这里的wg必须是指针
w := Worker{
In: make(chan int),
Done: func() {
wg.Done() // 函数式编程,将wg.Done() 放在函数中
},
}
go doWorker(id,w)
return w
}
func main() {
var workers [20]Worker
var wg sync.WaitGroup
for i := 0;i < 20; i++{
wg.Add(1)
workers[i] = CreateWorker(i,&wg)
}
for i,worker := range workers {
worker.In <- 'a'+i
}
wg.Wait()
}
执行结果:
乱序打印
锁争抢
看以下代码。下面的代码是一段有问题的代码。
func main() {
cnt := 0
var wg sync.WaitGroup
for i := 0; i < 500 ; i++{
wg.Add(1)
go func(wg *sync.WaitGroup) {
cnt++
wg.Done()
}(&wg)
}
wg.Wait()
fmt.Println(cnt)
}
执行结果:
498
使用 go run -race 去检测一下。
$ go run -race goroutine.go
==================
WARNING: DATA RACE
Read at 0x00c0000a6010 by goroutine 7:
main.main.func1()
/home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x38
Previous write at 0x00c0000a6010 by goroutine 6:
main.main.func1()
/home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:45 +0x4e
Goroutine 7 (running) created at:
main.main()
/home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4
Goroutine 6 (finished) created at:
main.main()
/home/weijiaxiang/go/src/socket/demo/goroutine/goroutine.go:44 +0xe4
==================
496
Found 1 data race(s)
上面显示 cnt++ 即在读又在写
为了解决这种线程不安全,就需要加锁。经过改造得。这下的结果就正确了,结果为500。
func main() {
cnt := 0
var wg sync.WaitGroup
var lock sync.Mutex
for i := 0; i < 500 ; i++{
wg.Add(1)
go func(wg *sync.WaitGroup) {
lock.Lock()
defer lock.Unlock()
cnt++
wg.Done()
}(&wg)
}
wg.Wait()
fmt.Println(cnt)
}
执行结果:
500
$ go run -race goroutine.go
500
本例子是对一个变量进行++ 操作。这是线程不安全的。所以这种操作还是加锁比较安全
select 多路选择和超时
当select 中没有default的话,哪个个case收到值了,就执行并返回,否则一直阻塞等待。
func createWorker() <-chan int {
ch := make(chan int)
go func() {
randInt := rand.New(rand.NewSource(time.Now().Unix())).Intn(10) //产生随机数
time.Sleep(time.Duration(randInt)*time.Second) // 睡眠随机秒
ch <- randInt // 扔进去一个channel
}()
return ch
}
func main() {
ch1 := createWorker()
ch2 := createWorker()
select { // 2个都不返回的话就阻塞,一个返回就彻底运行结束
case n := <- ch1:
fmt.Println("ch1 get ",n)
case n := <- ch2:
fmt.Println("ch2 get ",n)
}
}
执行结果:
ch2 get 6
那么select 有什么用呢?select 和 select,poll,epoll 类似。就是监听 IO 操作,当IO 操作发生时,就触发相应的动作,否则就阻塞。
- 用处一
超时等待
加上一个 time.After 。如果5s 还没有数据返回,就不阻塞了。
ch1 := createWorker()
ch2 := createWorker()
select {
case n := <- ch1:
fmt.Println("ch1 get ",n)
case n := <- ch2:
fmt.Println("ch2 get ",n)
case <- time.After(5*time.Second):
fmt.Println("get value timeout ")
}
- 用处二
判断channel是否满或空
因为ch1 和 ch2 都是空,所以就执行到 default,那么就可以判断所有的通道是否为空了。
ch1 := make (chan int, 1)
ch2 := make (chan int, 1)
select {
case <-ch1:
fmt.Println("ch1 pop one element")
case <-ch2:
fmt.Println("ch2 pop one element")
default:
fmt.Println("default")
}