GO阅读-同步编程-channel和select

2022-08-11  本文已影响0人  温岭夹糕

非原创,搬运工

环境

go1.20
dlv1.9.x

1.创建通道makechan

后续就以该demo代码进行逻辑梳理

func main() {
    c := make(chan int)
    c1 := make(chan int, 5)
    close(c)
    close(c1)
    fmt.Println("vim-go")
}

对变量c定义的那一行打断点

dlv debug main.go
b main.go:6
c
disass
channel汇编代码
通过查看汇编代码我们发现实际上是调用runtime.makechan方法(在之前学习make方法的时候我们就知道底层是这样调用的,现在是验证)
runtime/chan.go/makechan函数的定义
func makechan(t *chantype, size int) *hchan {
}

这个方法有点难看懂,但是看签名可以知道,最后得到了一个hchan的结构体(1.17和1.20没区别)

type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters

        lock mutex
}

我们再回头看makechan做的事,根据传入通道元素和通道大小,分配hchan和缓冲区内存,再初始化hchan成员

func makechan(t *chantype, size int) *hchan {
// elem是channel元素,这里我们是chan int
        elem := t.elem

        ...一系列参数的边界和限制校验...
//math.MulUintptr是判断申请的内存空间会不会超过最大申请限制
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
       ......  

        var c *hchan
        switch {
//无缓冲区情况分支,只给hchan分配一段内存
        case mem == 0:
                c = (*hchan)(mallocgc(hchanSize, nil, true))
                c.buf = c.raceaddr()
//channel不包含指针的情况
//给hchan和缓冲区c.buf分配内存
        case elem.ptrdata == 0:
                c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
                c.buf = add(unsafe.Pointer(c), hchanSize)
//上面两种情况内存都是连续的
//默认是单独给hchan和缓冲区分配内存
        default:
                c = new(hchan)
                c.buf = mallocgc(mem, elem, true)
        }

        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
//locakinit不太重要,根据函数名字可以推测是锁初始化
        lockInit(&c.lock, lockRankHchan)

        return c
}

func (c *hchan) raceaddr() unsafe.Pointer {
        return unsafe.Pointer(&c.buf)
}

1.1小结

makechan无非是干了两件数:1.参数校验和初始化hchan结构体,其中一共初始化了4个字段:buf,elemsize,elemtype,dataqsize

1.2另一种方法查看汇编

ssa是编译器经过优化生成的中间代码,在网页写代码之后直接查看,跟dlv一样,只是多一种同类型工具

2.数据发送chansend

查看底层汇编


image.png ssa网页查看结果
chan的发送实际上是调用runtime.chansend1方法(是chansend的封装)
func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

block为true时表示发送操作是阻塞的
chansend方法:

前面一堆参数验证
     lock(&c.lock)
//发送数据前加锁(所以channel发送是线程安全的),
//验证通道是否关闭
//这里可以得知往已经关闭的通道发送会引发panic
        if c.closed != 0 {
                unlock(&c.lock)
                panic(plainError("send on closed channel"))
        }

后面代码是分别讨论发送数据时遇到的情况

2.1 直接发送

     if sg := c.recvq.dequeue(); sg != nil {
                send(c, sg, ep, func() { unlock(&c.lock) }, 3)
                return true
        }

这种情况是投递数据时,刚好有人在等待,那么直接调用send方法投递给他
关于recvq是用于存储等待的gorutine的,是waitq类型,可以看到waitq是一个链表/队列结构

type waitq struct {
        first *sudog
        last  *sudog
}

waitq.dequeue逻辑很简单,就是递归读取链表

func (q *waitq) dequeue() *sudog {
        for {
//先获取头部元素,这个是最后要被弹出去的
//头部都没有就说明是个空链表
                sgp := q.first
                if sgp == nil {
                        return nil
                }
                y := sgp.next
//弹出头部后整理链表,更新first
//没有下一个就意味着弹出头部后是个空链表
//有就将下一个作为头部元素
                if y == nil {
                        q.first = nil
                        q.last = nil
                } else {
                        y.prev = nil
                        q.first = y
                        sgp.next = nil 
                }
//如果第一个不满足条件就扔掉重新来
                if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
                        continue
                }

                return sgp
        }
}

回到我们场景1调用send方法,主要逻辑是以下部分

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//拷贝变量地址到接收端的内存地址
        if sg.elem != nil {
                sendDirect(c.elemtype, sg, ep)
                sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {
                sg.releasetime = cputicks()
        }
        goready(gp, skip+1)

主要干了以下两件事情:

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//memove是内存拷贝
        memmove(dst, src, t.size)
}
//这里进行协程调度是为了唤醒那个在等待数据的协程
func goready(gp *g, traceskip int) {
    // 切换到g0的栈
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

该函数主要功能就是切换的g0栈执行ready方法,核心方法是gorutine状态切换到runable,放入队列等待P调度,这个在之前学习GMP的时候已经很熟悉了,这就是唤醒协程(这里是数据接收方)的方法(注意是放入runnext,并没有立即执行,要等待P调度)

至此发送数据的第一种情况分析完毕:若有等待协程则直接发送,具体发送过程是将元素拷贝给接收变量内存地址,唤醒数据接收方进入协程调度 发送过程

2.2缓冲区未满

回到chansend
若没有等待协程则放入缓冲区

//datasize缓冲区大小,qcount为放入元素数量
//进入该分支时此时缓冲区未满,且接收端没人等
        if c.qcount < c.dataqsiz {
//计算出下一个存储数据位置,sendx是发送指针位置
                qp := chanbuf(c, c.sendx)
//将刚进入channel的元素发送到计算出的位置
                typedmemmove(c.elemtype, qp, ep)
//发送指针自增
                c.sendx++
// 缓冲区满了,
//重置sendx
                if c.sendx == c.dataqsiz {
                        c.sendx = 0
                }
                c.qcount++
                unlock(&c.lock)
                return true
        }

qcount<datasize即buf中还有剩余空间,利用chanbuf计算出下一个存储数据的位置,再利用typedmemove将发送数据拷贝到计算出的位置,发送指针sendx+1,通道元素数量qcount+1

2.3关于阻塞发送

走到这里说明是条件都不满足:
1.没有缓冲区,没人消费
2.缓冲区大小不够,没人消费
需要阻塞当前协程

//block是传入的参数,是否要阻塞
//上面那两条件出现明显说明要休眠等待元素被消费了
// 如果block还是为false ,要求不阻塞
//那就直接返回
    if !block {
        unlock(&c.lock)
        return false
    }
//func getg() *g尝试获取发送数据使用的协程
//就是自己了没错
    gp := getg()
//获取sudog结构体并设置一次阻塞发送的配置
//将要发送的元素进行单独特例包装
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
//扔到sendq链表里面去
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
//你个协程睡觉去吧你
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

acquireSudog返回一个sudog结构体,并设置这一次阻塞发送的相关信息,包括发送的协程,是否在select中发送等
sendq.enqueue就是和recvq的dequeue相反,是将其入队,等待条件满足唤醒
调用gopark切走协程主动让出cpu执行权限
剩下代码就是被唤醒后做一些收尾的工作,如释放内存,return true表示成功发送数据

小结

至此通过阅读chansend源码,数据发送(chan<-i)的三种情况已经分析完毕:
1.刚好有协程在recvq中等待则直接给他
2.缓冲区有空位置(qcount<datasize)就放上去
3.都不满足就创建sudo结构体,包装元素,扔到sendq链表中,阻塞当前协程等待被唤醒
发送数据时触发Gorutine调度的几个时机:
1.直接发送时会唤醒协程(send调用goready),等待调度
2.发送数据时没找到接收方且缓存已满就将自己入队等待调度(gopark)

3.接收数据

ssa截图

本质上是调用runtime.chanrecv1(是对chanrecv的封装),
函数签名

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

chanrecv先对通道进行验证

    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    lock(&c.lock)

    if c.closed != 0 {
        if c.qcount == 0 {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
//这个是1.20新增的代码,但是总体逻辑还是一样的
//如果通道被关闭且缓冲区还有数据
//还能继续接收
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }

如果c已经被关闭,且不存在任何数据就返回true和false,也就是说有这样代码

data,ok := chan<-

ok = false 代表的是关闭且没有数据 ,ok = true代表关闭但是可能还有数据
同理接收数据分上面三种情况

3.1直接接收

    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

这里要注意recv会根据缓冲区大小分别处理

3.2缓冲区寻找

    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)

        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

逻辑类似上面发送代码,存到缓冲区中

3.3阻塞接收

    if !block {
        unlock(&c.lock)
        return false, false
    }

    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

还是获取接收协程信息,sudog包装入队然后阻塞,最后收尾工作

4.关闭通道

调用closechan方法 image.png

前面是异常处理,跳过

c.closed = 1

    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
    }

    for {
        sg := c.sendq.dequeue()
        ...
    }
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

清除sendq上的未被处理协程,最后为所有被阻塞的调用goready触发调度

5.select

在上面分析阻塞接收和阻塞发送时,我们都遇到结构体sudo(进入recvq或sendq队列的结构体,保存协程信息),它有一个字段叫做isSelect,判断当前执行环境是否在select中,其实在学习select的时候我们就知道了,我们常将select和channel搭配使用,这也说明了两者代码是存在一定关系的。
但是当我们想使用dlv对select打断点时发现有些没有效果,这段代码会在编译阶段被优化成其他!

5.1无case的select

select{}
image.png

这个可以通过dlv捕捉到,内部实现是runtime.block方法

func block() {
    gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
}

同时内部也调用gopark(让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases,上文的非阻塞发送有遇到过)进行永久阻塞

5.2单一channel

demo.go

func main() {
    c := make(chan int)
    x := 0
    select {
    case c <- x:
        fmt.Println("hi")
    }
}
image.png

实际上是直接被编译器优化成接收方法,并没有select的痕迹

5.3单一channel+default

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    default:
        fmt.Println("bye")
    }
image.png

调用selectnbsend,将c<-x改为 <-c


image.png

变成了调用selectnbrecv
阻塞发送的情况

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        return chansend(c, elem, false, getcallerpc())
}

内部是对chansend和chanrecv的封装,区别在于传入的block=false,还记得上文分析过,当刚好没有等待协程且缓冲区已满时会进入block判断分支:

//chansend函数
if sg:= c.recvq.dequeue
   ....
if c.qucount < c.dataqsize
  ...

        if !block {
                unlock(&c.lock)
                return false
        }

也就是说遇到上面情况(没有消费协程且没缓冲区)会快速失败,而不是继续向下执行阻塞协程,来让select后面的代码有可执行机会

再复杂点,多路复用(多个channel)

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    case <-c:
        fmt.Println(x)
    default:
        fmt.Println(x)
    }

image.png

变成了selectgo函数,这是select里面的重头戏(位于runtime/select.go)
先来看函数签名

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

为什么 selectgo 还需要传递一个 order0,而不是直接根据 ncase 直接分配呢
编译转换会使用 temp 函数来构造生成数组的语句,而这个语句便可以保证数据会分配到栈上,而不是堆上,避免了不必要的堆分配

select在go语言不存在相应的结构体,但是使用的分支case在go中使用scase结构体表示

type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}
//将case0数组和order0转为slice结构
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
   // [:n:n]的方式会让slice的len和cap相等
    ncases := nsends + nrecvs
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

下面看不懂没关系,我也看不懂索性直接copy,只要知道它是随机选取分支就好
首先会进行执行必要的初始化,决定处理case的两个顺序 :

order1会被分为pollorder和lockorder,这两个slice将会真正决定select的随机选择以及死锁问题

    norder := 0
    for i := range scases {
        cas := &scases[i]
//对于channel为nil的收发操作,将他们的elem设置为nil
        if cas.c == nil {
            cas.elem = nil // allow GC
            continue
        }
// fastrabdb为生成随机数函数
//porder刚开始是0,循环结束后是随机顺序的scases索引
        j := fastrandn(uint32(norder + 1))
        pollorder[norder] = pollorder[j]
        pollorder[j] = uint16(i)
        norder++
    }
    pollorder = pollorder[:norder]
    lockorder = lockorder[:norder]

     ....
     sellock(scases, lockorder)
     ....

这里的轮询顺序pollorder是随机的,避免channel的饥饿问题,保证公平性,之后根据channel的地址顺序确定加锁顺序来避免死锁发生(sellock函数)

如果多个 goroutine 都需要锁定 ch1 ch2,而他们加锁的顺序不固定,那么很可能会出现死锁问题
这个时候,对加锁的顺序就有要求了,按照同样的顺序的话,没有竞争到 ch1.lock 的 goroutine,会等待加锁 ch1.lcok,而不会直接去加锁 ch2.lock

func sellock(scases []scases, lockorder []int16) {
    var c *hchan
    for _, o := range lockorder {
        c0 := scases[0].c // 根据加锁顺序获取 case

        // c 记录了上次加锁的 hchan 地址,如果和当前 *hchan 相同,那么就不会再次加锁
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}

加锁完成,进入selectgo主循环逻辑
第一阶段的主要职责是查找所有 case 中是否有可以立刻被处理的 Channel。无论是在等待的 Goroutine 上还是缓冲区中,只要存在数据满足条件就会立刻处理

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder {
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()
        sg.g = gp
        sg.c = c

        if casi < nsends {
            c.sendq.enqueue(sg)
        } else {
            c.recvq.enqueue(sg)
        }
    }

    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    ...
}

如果不能立刻找到活跃的 Channel 就会进入循环的下一阶段,按照需要将当前 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中
除了将当前 Goroutine 对应的 runtime.sudog 结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒。
等到 select 中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo 函数的第三部分,从 runtime.sudog 中读取数据:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    sg = (*sudog)(gp.param)
    gp.param = nil

    casi = -1
    cas = nil
    sglist = gp.waiting
    for _, casei := range lockorder {
        k = &scases[casei]
        if sg == sglist {
            casi = int(casei)
            cas = k
        } else {
            c = k.c
            if int(casei) < nsends {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
        }
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        releaseSudog(sglist)
        sglist = sgnext
    }

    c = cas.c
    goto retc
    ...
}

第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecv 和 bufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:

bufrecv:
    recvOK = true
    qp = chanbuf(c, c.recvx)
    if cas.elem != nil {
        typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    selunlock(scases, lockorder)
    goto retc

bufsend:
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    selunlock(scases, lockorder)
    goto retc

这里在缓冲区进行的操作和直接调用 runtime.chansendruntime.chanrecv 差不多,上述两个过程在执行结束之后都会直接跳到 retc 字段。

两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:

rclose:
    selunlock(scases, lockorder)
    recvOK = false
    if cas.elem != nil {
        typedmemclr(c.elemtype, cas.elem)
    }
    goto retc

sclose:
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))

总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。

参考

1.go语言channel
2.go 深入刨析
3.go ready函数
4.go夜读

上一篇下一篇

猜你喜欢

热点阅读