并发进阶
前面讲述了一些Go语言中并发的基础内容,今天来讲一下Go语言并发的进阶内容。
多核并行化
我们在执行并行计算的时候,虽然我们使用了多个goroutine
,但这并不意味着执行的时间就会降低,这是因为当前版本的Go语言编译器还不能很智能的去发现和利用多核的优势,我们所设定的多个goroutine
,他们都是在一个CPU上执行的,在一个goroutine
得到时间片执行的时候,其他goroutine
都会处于等待状态。
但是我们可以自己进行设置环境变量GOMAXPROCS
来控制使用多少个CPU核心,具体代码如下:
runtime.GOMAXPROCS(16)
但我们到底该设置多少个核心呢,其实在runtime
包中还有一个NumCPU()
函数可以帮助我们获取核心CPU的数量。
协程同步
通道是可以被关闭的,但他们并不是每次在执行时都需要关闭,只有发送者才需要关闭通道,接受者永远都不会需要。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "New York"
ch <- "Beijing"
ch <- "Tokyo"
ch <- "Paris"
ch <- "London"
}
func getData(ch chan string) {
var input string
for {
input = <- ch
fmt.Printf("%s ", input)
}
}
/** The result is:
New York Beijing Tokyo Paris London
*/
那么问题是如何在通道的sendData()
完成时发送一个信号,getData()
又如何检测到通道是否关闭或阻塞?
第一种方法是使用close(ch)
函数:
ch := make(chan float64)
defer close(ch)
第二种方法可以使用逗号,ok的操作符:
v, ok := <-ch // 如果v接收到一个值,ok的值则为true
通常和if
语句一同使用
if v, ok := <-ch; ok {
process(v)
}
或者是当关闭或者阻塞的时候使用break
:
v, ok := <-ch
if !ok {
break
}
process(v)
当然,我们还可以使用_ = ch <- v
来实现非阻塞发送,因为空标识符获取到了发送给ch
的所有内容。
根据以上的几种方法,我们对前面的程序进行一定的修改:
package main
import (
"fmt"
)
func main() {
ch := make(chan string)
go sendData(ch)
getData(ch)
}
func sendData(ch chan string) {
ch <- "New York"
ch <- "Beijing"
ch <- "Tokyo"
ch <- "Paris"
ch <- "London"
close(ch)
}
func getData(ch chan string) {
for {
input, open := <- ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}
/** The result is:
New York Beijing Tokyo Paris London
*/
输出的结果肯定是和前面是一样的,但是我们对原有程序做了如下修改:
1)现在只有sendData()
是协程,getData()
和main()
在同一线程中
2)在sendData()
函数的最后,我们通过close(ch)
关闭了通道
3)在for
循环的getData()
中,在每次接受通道的数据之前都使用if !open
来进行检测
当然,使用for range
语句来读取通道是一个更好的办法,因为这会自动检测通道是否已经关闭:
for input := range ch {
process(input)
}
阻塞和生产者-消费者模式:在通道迭代中,两个协程经常是一个阻塞另外一个。如果程序工作在多核的机器上,大部分时间只用到一个处理器。可以通过使用带缓冲的通道来进行完善。
协程的恢复
在协程工作的过程中,如果其中的一个协程因为失败而停止了,但他并不会影响其他协程的工作。
func server(workchan <-chan *Work) {
for work := range workchan {
go safelyDo(work) // goroutine开始工作
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("%s failed in %v", err, work)
}
}()
do(work)
}
比如在这个程序当中,如果do(work)
发生了panic
,错误就会被记录且协程会退出并释放,而其他协程不受影响。而且这段代码中safelyDo(work *Work)
函数中加入了恢复模式,函数do
可以通过调用panic
来摆脱不好的情况。但是恢复是在panic
的协程内部的,它并不能被另一个协程恢复。