GO阅读-同步编程-channel和select
非原创,搬运工
环境
go1.20
dlv1.9.x
1.创建通道makechan
后续就以该demo代码进行逻辑梳理
func main() {
c := make(chan int)
c1 := make(chan int, 5)
close(c)
close(c1)
fmt.Println("vim-go")
}
对变量c定义的那一行打断点
dlv debug main.go
b main.go:6
c
disass
channel汇编代码
通过查看汇编代码我们发现实际上是调用runtime.makechan方法(在之前学习make方法的时候我们就知道底层是这样调用的,现在是验证)
runtime/chan.go/makechan函数的定义
func makechan(t *chantype, size int) *hchan {
}
这个方法有点难看懂,但是看签名可以知道,最后得到了一个hchan的结构体(1.17和1.20没区别)
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
- qcount chan中元素个数
- datasize chan中的循环链表长度,就是make创建指定的长度
- buf chan的缓冲区数据指针
- elemtype 接受元素类型
- elemtsize 接收单个元素大小
- sendx 发送指针位置
- recvx 接收指针位置
- sendq和recvq 使用双向链(环状)表用于存储等待的gorutine
我们再回头看makechan做的事,根据传入通道元素和通道大小,分配hchan和缓冲区内存,再初始化hchan成员
func makechan(t *chantype, size int) *hchan {
// elem是channel元素,这里我们是chan int
elem := t.elem
...一系列参数的边界和限制校验...
//math.MulUintptr是判断申请的内存空间会不会超过最大申请限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
......
var c *hchan
switch {
//无缓冲区情况分支,只给hchan分配一段内存
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
//channel不包含指针的情况
//给hchan和缓冲区c.buf分配内存
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
//上面两种情况内存都是连续的
//默认是单独给hchan和缓冲区分配内存
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
//locakinit不太重要,根据函数名字可以推测是锁初始化
lockInit(&c.lock, lockRankHchan)
return c
}
func (c *hchan) raceaddr() unsafe.Pointer {
return unsafe.Pointer(&c.buf)
}
1.1小结
makechan无非是干了两件数:1.参数校验和初始化hchan结构体,其中一共初始化了4个字段:buf,elemsize,elemtype,dataqsize
1.2另一种方法查看汇编
ssa是编译器经过优化生成的中间代码,在网页写代码之后直接查看,跟dlv一样,只是多一种同类型工具
2.数据发送chansend
查看底层汇编
image.png ssa网页查看结果
chan的发送实际上是调用runtime.chansend1方法(是chansend的封装)
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
block为true时表示发送操作是阻塞的
chansend方法:
前面一堆参数验证
lock(&c.lock)
//发送数据前加锁(所以channel发送是线程安全的),
//验证通道是否关闭
//这里可以得知往已经关闭的通道发送会引发panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
后面代码是分别讨论发送数据时遇到的情况
2.1 直接发送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
这种情况是投递数据时,刚好有人在等待,那么直接调用send方法投递给他
关于recvq是用于存储等待的gorutine的,是waitq类型,可以看到waitq是一个链表/队列结构
type waitq struct {
first *sudog
last *sudog
}
waitq.dequeue逻辑很简单,就是递归读取链表
func (q *waitq) dequeue() *sudog {
for {
//先获取头部元素,这个是最后要被弹出去的
//头部都没有就说明是个空链表
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
//弹出头部后整理链表,更新first
//没有下一个就意味着弹出头部后是个空链表
//有就将下一个作为头部元素
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil
}
//如果第一个不满足条件就扔掉重新来
if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
continue
}
return sgp
}
}
回到我们场景1调用send方法,主要逻辑是以下部分
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//拷贝变量地址到接收端的内存地址
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
主要干了以下两件事情:
- 调用sendDirect将发送数据直接拷贝到 接收channel通道元素的变量内存地址上去
即 x = c <-a ,拷贝到x内存地址上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//memove是内存拷贝
memmove(dst, src, t.size)
}
- 调用goready
//这里进行协程调度是为了唤醒那个在等待数据的协程
func goready(gp *g, traceskip int) {
// 切换到g0的栈
systemstack(func() {
ready(gp, traceskip, true)
})
}
该函数主要功能就是切换的g0栈执行ready方法,核心方法是gorutine状态切换到runable,放入队列等待P调度,这个在之前学习GMP的时候已经很熟悉了,这就是唤醒协程(这里是数据接收方)的方法(注意是放入runnext,并没有立即执行,要等待P调度)
至此发送数据的第一种情况分析完毕:若有等待协程则直接发送,具体发送过程是将元素拷贝给接收变量内存地址,唤醒数据接收方进入协程调度 发送过程2.2缓冲区未满
回到chansend
若没有等待协程则放入缓冲区
//datasize缓冲区大小,qcount为放入元素数量
//进入该分支时此时缓冲区未满,且接收端没人等
if c.qcount < c.dataqsiz {
//计算出下一个存储数据位置,sendx是发送指针位置
qp := chanbuf(c, c.sendx)
//将刚进入channel的元素发送到计算出的位置
typedmemmove(c.elemtype, qp, ep)
//发送指针自增
c.sendx++
// 缓冲区满了,
//重置sendx
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
qcount<datasize即buf中还有剩余空间,利用chanbuf计算出下一个存储数据的位置,再利用typedmemove将发送数据拷贝到计算出的位置,发送指针sendx+1,通道元素数量qcount+1
2.3关于阻塞发送
走到这里说明是条件都不满足:
1.没有缓冲区,没人消费
2.缓冲区大小不够,没人消费
需要阻塞当前协程
//block是传入的参数,是否要阻塞
//上面那两条件出现明显说明要休眠等待元素被消费了
// 如果block还是为false ,要求不阻塞
//那就直接返回
if !block {
unlock(&c.lock)
return false
}
//func getg() *g尝试获取发送数据使用的协程
//就是自己了没错
gp := getg()
//获取sudog结构体并设置一次阻塞发送的配置
//将要发送的元素进行单独特例包装
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//扔到sendq链表里面去
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//你个协程睡觉去吧你
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
acquireSudog返回一个sudog结构体,并设置这一次阻塞发送的相关信息,包括发送的协程,是否在select中发送等
sendq.enqueue就是和recvq的dequeue相反,是将其入队,等待条件满足唤醒
调用gopark切走协程主动让出cpu执行权限
剩下代码就是被唤醒后做一些收尾的工作,如释放内存,return true表示成功发送数据
小结
至此通过阅读chansend源码,数据发送(chan<-i)的三种情况已经分析完毕:
1.刚好有协程在recvq中等待则直接给他
2.缓冲区有空位置(qcount<datasize)就放上去
3.都不满足就创建sudo结构体,包装元素,扔到sendq链表中,阻塞当前协程等待被唤醒
发送数据时触发Gorutine调度的几个时机:
1.直接发送时会唤醒协程(send调用goready),等待调度
2.发送数据时没找到接收方且缓存已满就将自己入队等待调度(gopark)
3.接收数据
ssa截图本质上是调用runtime.chanrecv1(是对chanrecv的封装),
函数签名
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
chanrecv先对通道进行验证
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
//这个是1.20新增的代码,但是总体逻辑还是一样的
//如果通道被关闭且缓冲区还有数据
//还能继续接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
如果c已经被关闭,且不存在任何数据就返回true和false,也就是说有这样代码
data,ok := chan<-
ok = false 代表的是关闭且没有数据 ,ok = true代表关闭但是可能还有数据
同理接收数据分上面三种情况
3.1直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
这里要注意recv会根据缓冲区大小分别处理
- 不存在则调用recvDirect从sendq队列上取
- 存在则将数据拷贝到内存,将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方
3.2缓冲区寻找
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
逻辑类似上面发送代码,存到缓冲区中
3.3阻塞接收
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
还是获取接收协程信息,sudog包装入队然后阻塞,最后收尾工作
4.关闭通道
调用closechan方法 image.png前面是异常处理,跳过
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
清除sendq上的未被处理协程,最后为所有被阻塞的调用goready触发调度
5.select
在上面分析阻塞接收和阻塞发送时,我们都遇到结构体sudo(进入recvq或sendq队列的结构体,保存协程信息),它有一个字段叫做isSelect,判断当前执行环境是否在select中,其实在学习select的时候我们就知道了,我们常将select和channel搭配使用,这也说明了两者代码是存在一定关系的。
但是当我们想使用dlv对select打断点时发现有些没有效果,这段代码会在编译阶段被优化成其他!
5.1无case的select
select{}
image.png
这个可以通过dlv捕捉到,内部实现是runtime.block方法
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
}
同时内部也调用gopark(让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases,上文的非阻塞发送有遇到过)进行永久阻塞
5.2单一channel
demo.go
func main() {
c := make(chan int)
x := 0
select {
case c <- x:
fmt.Println("hi")
}
}
image.png
实际上是直接被编译器优化成接收方法,并没有select的痕迹
5.3单一channel+default
demo.go
select {
case c <- x:
fmt.Println("hi")
default:
fmt.Println("bye")
}
image.png
调用selectnbsend,将c<-x改为 <-c
image.png
变成了调用selectnbrecv
阻塞发送的情况
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
内部是对chansend和chanrecv的封装,区别在于传入的block=false,还记得上文分析过,当刚好没有等待协程且缓冲区已满时会进入block判断分支:
//chansend函数
if sg:= c.recvq.dequeue
....
if c.qucount < c.dataqsize
...
if !block {
unlock(&c.lock)
return false
}
也就是说遇到上面情况(没有消费协程且没缓冲区)会快速失败,而不是继续向下执行阻塞协程,来让select后面的代码有可执行机会
再复杂点,多路复用(多个channel)
demo.go
select {
case c <- x:
fmt.Println("hi")
case <-c:
fmt.Println(x)
default:
fmt.Println(x)
}
image.png
变成了selectgo函数,这是select里面的重头戏(位于runtime/select.go)
先来看函数签名
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
- case0是一个类型为[ncases]scase的数组
- order0 是一个指向[2*ncases]uint16数组,值都为0
为什么 selectgo 还需要传递一个 order0,而不是直接根据 ncase 直接分配呢
编译转换会使用 temp 函数来构造生成数组的语句,而这个语句便可以保证数据会分配到栈上,而不是堆上,避免了不必要的堆分配
- selectgo 会返回选中的序号,如果是个接收,还会返回是否接收到一个值
select在go语言不存在相应的结构体,但是使用的分支case在go中使用scase结构体表示
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
- c 类型hchan已经很熟悉了,就是一个通道的结构体,来存储case中使用的channel
回到selectgo函数
//将case0数组和order0转为slice结构
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// [:n:n]的方式会让slice的len和cap相等
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
下面看不懂没关系,我也看不懂索性直接copy,只要知道它是随机选取分支就好
首先会进行执行必要的初始化,决定处理case的两个顺序 :
- 轮询顺序pollorder
- 加锁顺序 lockorder
order1会被分为pollorder和lockorder,这两个slice将会真正决定select的随机选择以及死锁问题
norder := 0
for i := range scases {
cas := &scases[i]
//对于channel为nil的收发操作,将他们的elem设置为nil
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
// fastrabdb为生成随机数函数
//porder刚开始是0,循环结束后是随机顺序的scases索引
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
....
sellock(scases, lockorder)
....
这里的轮询顺序pollorder是随机的,避免channel的饥饿问题,保证公平性,之后根据channel的地址顺序确定加锁顺序来避免死锁发生(sellock函数)
如果多个 goroutine 都需要锁定 ch1 ch2,而他们加锁的顺序不固定,那么很可能会出现死锁问题
这个时候,对加锁的顺序就有要求了,按照同样的顺序的话,没有竞争到 ch1.lock 的 goroutine,会等待加锁 ch1.lcok,而不会直接去加锁 ch2.lock
func sellock(scases []scases, lockorder []int16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[0].c // 根据加锁顺序获取 case
// c 记录了上次加锁的 hchan 地址,如果和当前 *hchan 相同,那么就不会再次加锁
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}
加锁完成,进入selectgo主循环逻辑
第一阶段的主要职责是查找所有 case 中是否有可以立刻被处理的 Channel。无论是在等待的 Goroutine 上还是缓冲区中,只要存在数据满足条件就会立刻处理
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.c = c
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
...
}
如果不能立刻找到活跃的 Channel 就会进入循环的下一阶段,按照需要将当前 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中
除了将当前 Goroutine 对应的 runtime.sudog
结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark
挂起当前 Goroutine 等待调度器的唤醒。
等到 select
中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo
函数的第三部分,从 runtime.sudog
中读取数据:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
sglist = gp.waiting
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
casi = int(casei)
cas = k
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
c = cas.c
goto retc
...
}
第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。
由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。
当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecv 和 bufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
这里在缓冲区进行的操作和直接调用 runtime.chansend
和 runtime.chanrecv
差不多,上述两个过程在执行结束之后都会直接跳到 retc
字段。
两个直接收发 Channel 的情况会调用运行时函数 runtime.send
和 runtime.recv
,这两个函数会与处于休眠状态的 Goroutine 打交道:
recv:
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
两个直接收发 Channel 的情况会调用运行时函数 runtime.send
和 runtime.recv
,这两个函数会与处于休眠状态的 Goroutine 打交道:
recv:
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:
- 从一个关闭 Channel 中接收数据会直接清除 Channel 中的相关内容(1.20版本需要判断有无缓冲区);
- 向一个关闭的 Channel 发送数据就会直接 panic 造成程序崩溃:
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。
参考
1.go语言channel
2.go 深入刨析
3.go ready函数
4.go夜读