Go语言基础6 - 并发
概述
我们将用几节来学习Go语言基础,本文结构如下:
1. 并发
通过通信共享内存
Go程
信道
信道中的信道
并行化
可能泄露的缓冲区
2. 错误
Panic
恢复
1. 并发
1.1 通过通信共享内存
在并发编程中,为实现对共享变量的正确访问需要精确的控制,这在多数环境下都很困难。
实际上,多个独立执行的线程从不会主动共享。Go语言另辟蹊径,它将共享的值通过信道传递, 在任意给定的时间点,只有一个Go程能够访问该值,数据竞争从设计上就被杜绝了。
例如,引用计数通过为整数变量添加互斥锁来很好地实现。 取而代之的是,通过信道来控制访问能够让你写出更简洁的程序。
Go将它简化为一句口号:
不要通过共享内存来通信,而应通过通信来共享内存。
1.2 Go程
Go程具有简单的模型:
- 它是与其它Go程并发运行在同一地址空间的函数。
- 它是轻量级的, 消耗几乎就只有栈空间的分配。
- 而且栈最开始是非常小的,所以它们很廉价, 仅在需要时才会随着堆空间的分配(和释放)而变化。
Go程在多线程操作系统上可实现多路复用,因此若一个线程阻塞,比如说等待I/O, 那么其它的线程就会运行。
Go程的设计隐藏了线程创建和管理的诸多复杂性。
在函数或方法前添加 go 关键字能够在新的Go程中调用它。当调用完成后, 该Go程也会安静地退出,示例:
go list.Sort() // 并发运行 list.Sort,无需等它结束。
函数字面在Go程调用中非常有用。
备注:可理解 为匿名函数的调用。下面的方法先声明了一个匿名方法,然后立即调用。
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println(message)
}() // 注意括号 - 必须调用该函数。
}
在Go中,函数字面都是闭包:其实现在保证了函数内引用变量的生命周期与函数的活动时间相同。
1.3 信道( chan )
1.3.1 格式: make(chan int)
信道与映射一样,也需要通过 make 来分配内存,make 后的返回值是对底层数据结构的引用。
若提供了一个可选的整数形参,它就会为该信道设置缓冲区大小。
缓冲区大小的默认值是零,表示不带缓冲的或同步的信道。
示例:
ci := make(chan int) // 整数类型的无缓冲信道
cj := make(chan int, 0) // 整数类型的无缓冲信道
cs := make(chan *os.File, 100) // 指向文件指针的带缓冲信道
无缓冲信道在通信时会同步交换数据,它能确保(两个Go程的)计算处于确定状态。
1.3.2 阻塞等待Go程( 无缓冲区的示例 )
示例:使用 go 程,在后台启动了排序操作,等待排序完成。
c := make(chan int) // 分配一个信道
// 在Go程中启动排序。当它完成后,在信道上发送信号。
go func() {
list.Sort()
c <- 1 // 发送信号,什么值无所谓。
}()
doSomethingForAWhile()
<-c // 等待排序结束,丢弃发来的值。
- 接收者在收到数据前会一直阻塞。
- 若信道是不带缓冲的,那么在接收者收到值前, 发送者会一直阻塞;
- 若信道是带缓冲的,则发送者仅在值被复制到缓冲区前阻塞;
- 若缓冲区已满,发送者会一直等待直到某个接收者取出一个值为止。
1.3.3 控制吞吐量的例子( 带缓冲的示例 )
带缓冲的信道可被用作信号量。例如限制吞吐量。
示例:
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // 等待活动队列清空。 #2 占据,阻塞
process(r) // 可能需要很长时间。
<-sem // 完成;使下一个请求可以运行。 #3 解除占据
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // 无需等待 handle 结束。 #1 每个请求对应一个 Go程
}
}
上面的例子中:
- 进入的请求 req 会被传递给 handle。
- handle 中 #2 等待一个信号继续(当缓冲区满时)
- handle 中 #3 后,发送信号,使得 被阻塞的另一个 go程 开始进入到process
- 信道缓冲区的容量决定了同时调用 process 的数量上限
备注:
这个示例一次开始了全部多个go程,然后根据缓冲区大小阻塞等待,当缓冲区可以进入时继续进行。
1.3.4 继续改良的例子( 采用匿名方法 )
若请求来得很快, 上面的程序就会无限地消耗资源。为了弥补这种不足,我们可以通过修改 Serve 来限制创建Go程:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // 这儿有Bug,解释见下。
<-sem
}()
}
}
Bug出现在Go的 for 循环中,该循环变量在每次迭代时会被重用,因此 req 变量会在所有的Go程间共享,这不是我们想要的。我们需要确保 req 对于每个Go程来说都是唯一的。
有一种方法能够做到,就是将 req 的值作为实参传入到该Go程的闭包中:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
闭包的处理
比较前后两个版本,观察该闭包声明和运行中的差别。 另一种解决方案就是以相同的名字创建新的变量,如例中所示:
func Serve(queue chan *Request) {
for req := range queue {
req := req // 为该Go程创建 req 的新实例。
sem <- 1
go func() {
process(req)
<-sem
}()
}
}
它的写法看起来有点奇怪
req := req
但在Go中这样做是合法且惯用的。你用相同的名字获得了该变量的一个新的版本, 以此来局部地刻意屏蔽循环变量,使它对每个Go程保持唯一。
1.3.5 固定数据的go程,同时读取
另一种管理资源的好方法:
-
启动固定数量的 handle Go程,一起从请求信道中读取数据。
-
Go程的数量限制了同时调用 process 的数量。
-
Serve 同样会接收一个通知退出的信道, 在启动所有Go程后,它将阻塞并暂停从信道中接收消息。
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}func Serve(clientRequests chan *Request, quit chan bool) {
// 启动处理程序,固定数量
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // 等待通知退出。
}
1.4 信道中的信道
这种特性通常被用来实现安全、并行的多路分解。
在上一节的例子中,handle 是个非常理想化的请求处理程序, 但我们并未定义它所处理的请求类型。若该类型包含一个可用于回复的信道, 那么每一个客户端都能为其回应提供自己的路径。以下为 Request 类型的大概定义。
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
客户端提供了一个函数及其实参,此外在请求对象中还有个接收应答的信道。
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待回应
fmt.Printf("answer: %d\n", <-request.resultChan)
服务端的处理
On the server side, the handler function is the only thing that changes.
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
1.5 并行化
这些设计的另一个应用是在多CPU核心上实现并行计算。如果计算过程能够被分为几块 可独立执行的过程,它就可以在每块计算结束时向信道发送信号,从而实现并行处理。
1.6 可能泄露的缓冲区
--
2. 错误
Go语言具有多值返回特性, 使得它可以在返回常规的值,和详细的错误描述。
按照约定,错误的类型通常为 error,这是一个内建的简单接口。
type error interface {
Error() string
}
库的编写者通过更丰富的底层模型可以轻松实现这个接口,这样不仅能看见错误, 还能提供一些上下文。
例如,os.Open 可返回一个 os.PathError。
/* 定义结构体 */
// PathError 记录一个错误以及产生该错误的路径和操作。
type PathError struct {
Op string // "open"、"unlink" 等等。
Path string // 相关联的文件。
Err error // 由系统调用返回。
}
/* 实现 Error接口 */
func (e *PathError) Error() string {
return e.Op + " " + e.Path + ": " + e.Err.Error()
}
这样,PathError的 Error 会生成如下错误信息:
open /etc/passwx: no such file or directory
错误字符串应尽可能地指明它们的来源,解释清楚错误的情况。
若调用者想知道更多细节,可使用类型选择或者类型断言来查看特定错误,和处理。
for try := 0; try < 2; try++ {
file, err = os.Create(filename)
if err == nil {
return
}
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
deleteTempFiles() // 恢复一些空间。
continue
}
return
}
上面的第5行,即第2条 if 是另一种类型断言。
若它失败, ok 将为 false,而 e 则为nil. 若它成功, ok 将为 true
2.1 Panic
有时程序就是不能继续运行。为此,可以使用内建的 panic 函数,它会产生一个运行时错误并终止程序。
该函数接受一个任意类型的实参(一般为字符串),并在程序终止时打印输出。格式:
Panic( 字符串 )
实际使用中,库函数应避免 panic。若问题可以被屏蔽或解决, 最好就是让程序继续运行,而不是终止。
一个反例的情况就是初始化中: 若某个库真的不能让自己工作,那就触发Panic 吧,比如
var user = os.Getenv("USER")
func init() {
if user == "" {
panic("no value for $USER")
}
}
2.2 恢复
当 panic 被调用后, 程序将立刻终止当前函数的执行,并开始回溯Go程的栈,运行任何被推迟的函数。 若回溯到达Go程栈的顶端,程序就会终止。
// 我自己画的不太严谨的图例,帮助理解。
// 假如在 main 函数里调用了 方法1,在 方法1 里又调用了 方法2
| |
| |
| #4 方法2 | // 假如在这里触发了 Panic
| #3 方法2的defer | //在 defer 时,仍然有机会调用 recover函数来恢复
| #2 方法1 |
| #1 main | //到这里就程序终止了
-------------------------------
不过我们可以用内建的 recover 函数来 取回Go程的控制权限 并使其恢复正常执行。
调用 recover函数 将停止回溯过程,它的返回值是错误信息(实际是调用 panic 函数时的参数)。
由于在回溯时,只有被推迟的函数( defer )在运行,因此 recover 只能在被推迟(defer)的函数中才有效。
在 Go程 内通过 recover 来终止失败的Go程,而无需让整个程序崩溃。
先看示例代码:
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Println("work failed:", err)
}
}()
do(work)
}
在此例中,若 do(work) 触发了Panic,其结果就会被记录(打印输出), 而该Go程会被干净利落地结束,不会干扰到其它Go程。我们无需在推迟的闭包中做任何事情, recover 会处理好这一切。