Go: channel 源码实现

2019-02-15  本文已影响47人  董泽润

关于 channel 实现的文章一大堆,网上好多抄来抄去的,这篇也没啥新意。实现一点也不复杂,直接撸代码 src/runtime/chan.go

channel 结构体

先看一下 channel 结构体的实现,代码注释己经足够好,字段也足够精简

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

qcount 当前缓存元素个数,dataqsiz 当前能缓存的最大容量,类似 slice 的 len 和 cap, 实际上如果 len(ch), cap(ch) 就是取得这两个字段。buf 缓存底层数据的指针,是一个定长的环形数组。elemsizeeletype 代表底层元素类型和长度,这个好理解,一个 channel 创建时就己经确定存储元素类型。sendx, srecvx 环境数组的索引,recvq, sendq 消费者和生产者队列。closed 非 0 代表己关闭不允许再写入。lock 是一把大锁,有点经难的就会明白大锁意味着什么,在多生产者时性能会有影响。

创建 channel

quitCh1 := make(chan int, 10)
quitCh2 := make(chan int)
quitCh3 := make(chan *SomeStruct, 10)

上面是常用的创建 ch 语句,第一个创建 int 类型管道,缓存容量 10, 第二个创建 int 类型管道,缓存容量是 0, 第三个同样创建容量 10 个,但是类型是指针。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case size == 0 || elem.size == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}
  1. 首先判断 elem.size 存储元素的大小,不能大于 1<<16, 一般结构体远不会这么大
  2. hchanSize 是 channel 本身结构体的大小加上一定 padding,必须是按 maxAlign 对齐的,并且元素 elem.align 对齐也不能大于 maxAlign
    maxAlign  = 8
    hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

看源码文件,默认 maxAlign 值是 8,hchanSize 是 sizeof(hchan) 再根据 maxAlign 对齐调整后的值

  1. 判断创建 channel size 是否合法
  2. switch 语句块根据三种情况分别创建 hchan 结构体并分配底层 buf 空间。这里要看元素是否是指针类型的,如果 kindNoPointers 那就是值类型的,hchan 结构体和 buf 一起分配,地址连续。如果 size == 0, 代表是非缓冲 channel,不分配 buf
  3. 最后赋值 elemsize, elemtype, dataqsiz, 返回 hchan

读写数据语法糖

我们一般用 select case 去读取 channel 数据,左侧接收变量可以有两个,第一个是管道读出的数据,第二个 bool 值代表是否有效,如果管理己经被关闭了,那读到得数据无效,第二个就是 false

case v, ok = <-c:
case v = <-c:

上面是常见的两种语法糖,最终都会被编绎器展开成 selectnbrecv2 或是 selectnbrecv

compiler implements

    select {
    case v, ok = <-c:
        ... foo
    default:
        ... bar
    }

as

    if c != nil && selectnbrecv2(&v, &ok, c) {
        ... foo
    } else {
        ... bar
    }

这是官方源码里的注释,还有 send 发送数据的语法糖,大家可以自行查看。

生产数据

参数有四个,c 是管道,ep 是将要生产写入的数据,block 表是是否阻塞发送,利用 select 语句可以实现非阻塞发送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {  // 检测 chan 是否为 nil
        if !block { // 如果为 nil 并且非阻塞那么直接返回 false
            return false
        } // 如果 chan 为 nil, 并且阻塞试操作,那么掉用者直接 panic 挂掉
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
    // fast path 就是快速判断的意思,并不是所有情况都要加锁,有时可以直接退出,返回 false 要同时满足以下三种情况
    //1. 非阻塞试写
    //2. 当前管理没有被关闭
    //3. 容量为 0,并且没有消费者在待待,或是,容量不为 0,但是己经满了
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock) // 开始加上了万恶的锁

    if c.closed != 0 { // 如果向己经关闭的管道写数据,解锁,panic
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil { // 如果当前有消费者在等待数据,说明 buf 空了,有消费者阻塞着,直接发送给他,不用缓存到 buf 中
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 最后一定要 unlock 
        return true
    }

    if c.qcount < c.dataqsiz { // 如果 buf 缓存中有空间,那么就写到 buf 中
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx) // 计算获取存放新元素的位置
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep) // 将数据 copy 到指定位置
        c.sendx++ // 更新环形数组 sendx 和 qcount 然后解锁
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock) // 成对,不能忘了解锁
        return true // 写成功了,返回 true
    }

    if !block { // 如果 buf 空间不足了,并且非阻塞写入,那么解锁退出,返回 false,代表写入失败了
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us. 如果空间没有,并且阻塞式写入,那么 block 在这里,等待别人来唤醒
    gp := getg() // 获取当前 goroutine 
    mysg := acquireSudog() // mysg 是在此封装的结构体,有大用处
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep // 将要写入的数据,放到 mysg.elem 字段中,等待以后用
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg) // 将自己入队,写入 channel 的 sendq 队列
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // 交出控制权,当前生产者 goroutine 休眠 park 在这里,等待被唤配

    // someone woke us up. 此时有人把生产者唤醒了,但是 gp.waiting 不是当前的 mysg 说明数据被搞花了,破坏了直接 panic 掉好了
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil // 重置空值
    if gp.param == nil { // gp.param 为空说明出问题了,panic,具体为什么呢?需要配对看如何 chanrecv 的代码
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true // 到此数据生产者写入成功,返回 true,释放 mysg
}

如果当前有消费者在等待数据,生产者直接把数据发送过去,看下源码

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}


func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // No need for cgo write barrier checks because dst is always
    // Go memory.
    memmove(dst, src, t.size)
}

消费数据

对应的,读取数据是消费者

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil { // 对于空 channel 读取是有问题的
        if !block { // 如果非阻塞的,直接退出好了
            return
        } // 对于阻塞的读取,直接 panic 掉
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not ready for receiving, we observe that the
    // channel is not closed. Each of these observations is a single word-sized read
    // (first c.sendq.first or c.qcount, and second c.closed).
    // Because a channel cannot be reopened, the later observation of the channel
    // being not closed implies that it was also not closed at the moment of the
    // first observation. We behave as if we observed the channel at that moment
    // and report that the receive cannot proceed.
    //
    // The order of operations is important here: reversing the operations can lead to
    // incorrect behavior when racing with a close. 也是有一个快速检测的,这样不用加锁了,和 chansend 原理类似
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 { // 如果管道己经关闭,并且没有未读数据,解锁,返回零值
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil { // 重点在这里,memclr 使返回值 ep 变成了零值
            typedmemclr(c.elemtype, ep)
        } // 返回的第二个 false,所以说从 channel 中读取零值不代表是正确的数据,也有可能管道己经 close 了
        return true, false 
    }

    if sg := c.sendq.dequeue(); sg != nil { // 如果此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 { // 如果此时没有生产者写数据,但是 buf 有数据,那么从 buf 中读
        // Receive directly from queue 找到未读的第一个数据
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil { // 将未读的第一个数据,写到 ep 里面
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp) // 清空环形数组己经消费的 gp,维护 recvx 
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock) // 解锁,返回 true ,true
        return true, true
    }

    if !block { // 如果没有可用数据,并且读是非阻塞,直接锁锁返回好了
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg) // 将消费者 mysg 入队,然后 gopark 在这里,等待被唤醒
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

recv 函数表示此时有 send 生产者阻塞在这里,那么直接从 send 生产者取数据

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 { // 没有缓冲 buf,直接从生产者获取数据
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx) // 从 buf 中获取数据,此时有 send 阻塞在这里,那么 buf 肯定数据己经满了
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil { // 复制 buf 指定位置的数据给消费者
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue 然后把阻塞的生产者数据写到此位置
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil // 此时生产者暂时在 sg.elem 数据己经写入 channel 了,那么清空
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg) // gp.param 被赋值 sg,从这可以回头看 send 时为什么要检查该字段
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1) // 唤醒生产者
}
上一篇下一篇

猜你喜欢

热点阅读