Go语言中的并发进程以及通道通信
goroutine
在go语言中要并发几个进程很简单
- 定义一个函数
- 用go运行一下
例如下面的例子,主程序main开出一个goroutine
package main
import(
"fmt"
)
func f(){
for i:=1;i<100;i++{
if i % 7==0 {
fmt.Println(i)
}
}
}
func main(){
fmt.Println("Started in main process")
fmt.Println("forking to goroutine")
go f()
fmt.Println("the main process ends here")
}
这样一段简单代码,拿去一跑,死的很难看,7的倍数一个都没有打印出来,程序就跑完了。原因是主程序还没有等f跑完就回收内存,卷铺盖走人了.
正确的方式是,在主程序中加入一个开关,当开出goroutine后,按下开关,然后就在那边等goroutine按起开关
package main
import(
"fmt"
"sync"
)
func f(wg *sync.WaitGroup){
for i:=1;i<100000000;i++{
if i % 7==0 {
fmt.Println(i)
}
}
wg.Done() //按起开关
}
func main(){
var wg sync.WaitGroup //开关
fmt.Println("Started in main process")
fmt.Println("forking to goroutine")
wg.Add(1) //按下开关
go f(&wg)
wg.Wait() // 等待
fmt.Println("the main process ends here")
}
这里我们看到了简单的进程间协同通讯,用的是一个共享的指针。在Go中,进程间可以通过通道channel来传输数据。
Go Channel 语义
channel 变量的声明通过make来生成
ch := make(chan int) // 无缓冲的channel
ch := make(chan int, 3) // 缓冲量为3的channel, 0 表示没有buffer的channel
对channel变量的操作,有发送和接收以及关闭3种.
接收和发送两种操作都是用向左箭头表示(<-
) ,两种不同的意思通过channel变量与<-
左右关系来表示。
ch <- x //发送操作
x = <-ch // 接收操作
从字面的意思来说,通道是中间路带,发送和接收两种操作的主语应该是另外一个变量(上面的x
).
channel的receive操作
channel 的buffer
我们可以把无缓冲的通道想像成,一手给另外一手接。
无缓冲的管道
把有缓冲的通道想想成一条船,有5个缓冲的通道一次可以载5人。
有缓冲的管道
ch :=make(cha int) // unbuffered channel
pool :=make(chan int,2) // buffered channel
无缓冲channel的阻塞性
无缓冲的通道不管是send操作还是receive操作都会阻塞程序。在一个goroutine里面上数据send入管道,在另外一个goroutine里面接收这个数据前,前面的那个goroutine会一直在那边等,直至另外一个goroutine里开始receive数据。同样的,在一个goroutine里面进行一次receive 操作的话,如果没有其他一个goroutine进行send 操作的话,前面那个goroutine会一直在那边等。下面程序,将同一个通道的send和receive操作一起放在主程序里面跑,编译能通过,但是运行会出现下面的错误:
fatal error: all goroutines are asleep - deadlock!
package main
import(
"fmt"
)
func main(){
pool :=make(chan int,0) // unbuffered channel
fmt.Println("sending a number to channel")
pool<- 10
fmt.Println("getting it back")
<-pool
// running deadlock
}
将pool 的buffer size 改成1 就能正常运行。
有缓冲channel 的阻塞性
- send操作(ch <- x)相当于将变量x的值放入管道,当管道还没有满的时候,该操作会立即执行,程序不会被阻塞在这里。当管道满时, 程序就会阻塞在这边,等待通道里面数据被取走(接收)一个后,就可以进行这里的send操作了。如果有多个等待send操作的语句,系统会安排这边被阻塞的send操作是否被执行。这边所说的系统是go 的运行时(runtime),它负责程序的总调度。
- Receive操作(x := <-ch) 相当于将管道队列里面的值取出来,腾空管道里面的位置。当管道里面不是空的时候,该操作会立即执行,程序继续往下执行。当管道被取空时,程序在这里阻塞,等待有新的值被send入管道。同样,多个等待receive操作的语句,会由go的运行时决定安排谁率先获得操作权。
package main
import(
"fmt"
"time"
)
func main(){
pool :=make(chan int,2) // buffered channel
go func(){
pool <- 1 //feed pool with 1 value
fmt.Println("sending 1")
}()
go func(){
pool <- 2
fmt.Println("sending 2")
}()
go func(){
pool <- 3
fmt.Println("sending 3")
}()
fmt.Println("=")
time.Sleep(1 * time.Second)
x := <-pool
fmt.Printf("%d received\n",x)
fmt.Println("==")
time.Sleep(2 * time.Second)
x = <-pool
fmt.Printf("%d received\n",x)
fmt.Println("=====")
time.Sleep(5 * time.Second)
x = <-pool
fmt.Printf("%d received\n",x)
fmt.Println("=")
time.Sleep(1 * time.Second)
close(pool)
}
这里用3个go语句来开启3个goroutine, 由于3个goroutine 可以看成3个独立的线程,3者中哪个先打印出来顺序是随机的。
调整buffer size, 程序有不同的反应, 当 buffer size 为0 时,输出结果可能是这样的:
=
sending 1
1 received
==
sending 2
2 received
=====
3 received
=
sending 3
当buffer size 为1时:
sending 1
=
sending 2
1 received
==
2 received
=====
sending 3
3 received
=
当buffer size 为2时:
=
sending 2
sending 1
1 received
sending 3
==
2 received
=====
3 received
=
当buffer size 为3时:
=
sending 2
sending 1
sending 3
1 received
==
2 received
=====
3 received
=
channel range 枚举操作
用for range 操作可以循环不断的从一个通道中receive 数据,直到该通道关闭。如果发送端再也不发送数据,而通道中的数据被取完并且通道没有被关闭,程序会报deadlock错误,停止工作。
package main
import(
"fmt"
"time"
)
func main(){
pool :=make(chan int) // unbuffered channel
go func(){
for x :=0;x<10;x++{
fmt.Printf("sending:%d\n",x)
pool <- x
time.Sleep(1 * time.Second)
}
close(pool) //重要
}()
for x := range pool{
// this loop will continue to fetch data from the channel until channel is closed
// when there is no feeding on the sender side and channel is not closed
// fetching data from channel will encounter deadlock panic
fmt.Println( x*x)
}
}
其中
for x := range pool{
...
}
相当于
for{
x, ok := <-pool
if !ok{ //channel was closed and drained
break
}
....
}
//当通道上的数据都取完,并已关闭时, 在从通道上取数据会得到零值(zero value)
// 同时上面的ok会得到false值
// 本段代码来自于《go programming language》
关闭的channel
- 当用
close(ch)
语句关闭后,如果再用x = <-ch
来接收数据将得到零值,而for x:=range {}
语句则不会执行。如上面代码所示 - 在关闭的channel上执行send操作,会导致程序报错退出。
- 在一个未关闭的通道上,执行receive操作次数(包括
range
操作)大于send的次数,将会导致程序报错退出。
select添加监听器
当通道用于goroutine的调度时,可以使用下面的循环模式,配合select语句来监听来自不同通道的数据通讯。
for{
select{
case <-ch1
//...
case x:=<-ch2
//...
case ch3 <-y
default:
//...
}
if ...{
break
}
}
select语句会等待这些通道发出动静,其中任意一个通道上有通讯时,就触发这个case下面的语句,其他case下语句保持不变。一个没有case的select{}会永远等待。
单一方向的channel
如果通道作为函数的参数是,可以指明该通道的方向。这样在函数内部对通道只能进行send或receive操作。有了这种限定,会在编译时就能发现代码的错误。
func counter(output chan<-int, input <-chan int){
for v :=range input{
output <-x
}
close(output)
}
// 例子来自《Go programming language》
例子1: 每0.1秒出一个字符的随机字符生成器
time.Tick函数返回一个通道,该通道能够周期性的发出信号来。
package main
import (
"fmt"
"math/rand"
"time"
)
func main(){
tick :=time.Tick(100* time.Millisecond)
rand.Seed(time.Now().Unix())
for n:=0;n<10;n++{
select{
case <-tick:
var s int
for{
s = rand.Intn(122)
if (s>64 && s<91)|| (s>96 && s<123){ // 字母的ASCII值
break
}
}
fmt.Printf("%c",s)
}
if n>10{
break
}
}
fmt.Println("")
}
例子2:多线程处理复杂计算任务的worker模型
其基本思想如下图:
worker模型
package main
// 本程序模拟多个线程(worker)共同完成多个任务(产生随机字符)
import (
"fmt"
"time"
"math/rand"
)
func random_char() int{
var s int
for{
s = rand.Intn(123)
if (s>64 && s<91)|| (s>96 && s<123){ // 字母的ASCII值
break
}
}
return s
}
func worker(id int, jobs <-chan int, results chan<- int){
// heavy work goes in here
for n := range jobs {
time.Sleep(time.Duration(n*50) * time.Millisecond)
fmt.Printf("worker %d finished\n",id)
results<- random_char()
}
}
func main(){
n_threads:=5;// 线程数
n_jobs:=1000;// 任务数
var jobs_list =make([]int,n_jobs)
var results_list = make([]int, n_jobs)
for i:=0;i<n_jobs;i++{
// 生成任务
jobs_list[i]=rand.Intn(3)
}
//开启通道
buffer_size:=10
jobs :=make(chan int,buffer_size)
results:=make(chan int,buffer_size)
rand.Seed(time.Now().Unix())
for w := 1; w <= n_threads; w++ {
// 开启多个线程
go worker(w,jobs, results)
}
// 分发第一批任务
var i int // i为分发掉任务的计数器
for i=0; i<buffer_size;i++ {
jobs <- jobs_list[i]
}
finished :=0
for {
// 当全部完成时,停止
if finished >= n_jobs{
break
}
//循环监测任务的完成情况, 当有任务完成时, 再次分发没有完成的任务
select{
case r:=<-results:
results_list[finished]=r
finished ++
if i<n_jobs{
jobs <- jobs_list[i] //分发没有完成的任务
i++;
}
}
}
close(jobs)
close(results)
for i=0;i<len(results_list);i++{
fmt.Printf("%c",results_list[i])
}
fmt.Println("")
}
参考
Go Programming language
Go实现线程池
Go by example
初学者,请多指教
原创内容,转载请注明 copywrite by threadtag