go语言

Go语言——channel详解

2018-11-01  本文已影响0人  陈先生_9e91

Go语言——channel详解

channel和goroutine是go语言最具特色是结构,有必要仔细研究。

源码路径:go1.10\src\runtime\chan.go

struct

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   lock mutex
}

从上面可以看出,waitq非常重要

type waitq struct {
   first *sudog
   last  *sudog
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    g *g

    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

waitq是一个queue,具有头尾指针。

sudog封装了等待chann接收/发送的G。sudogs有一个特殊的池,提高性能。

new

chan不能直接new,只能make,所以观察makechan方法。make的时候,会根据size判断是缓冲chan还是非缓冲chan,这个逻辑需要注意。

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   var c *hchan
   switch {
   case size == 0 || elem.size == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = unsafe.Pointer(c)
   case elem.kind&kindNoPointers != 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
    
   return c
}

这里看到由于要对buf对象分配,所以没有提供new方法,只能make。除了对default情况比较容易理解,另外两只buf的内存分配都不是很理解=。=,先遗留。

chansend

send & revc
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   lock(&c.lock)

   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
  1. lock
  2. 如果chann close,就直接panic,因为不能往close chan send数据
  3. 如果有G在等待接收数据,就直接将数据发给它。
   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
  1. 如果buf还有剩余,就将数据保存在buf里面
  2. sendx++,跟上面的gif动图一样,指向下一个send位置
  3. sendx == 0表示buf满了
   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()

   mysg.elem = ep
   mysg.g = gp
   c.sendq.enqueue(mysg)

   releaseSudog(mysg)
   return true
}

以上情况不满足的话,接下来要做的就是block发送数据的g,看看chan具体怎么做的

  1. 获取当前g,注意这里的g是往chan发数据的g
  2. 从pool中获取一个sudog,保存当前g信息,方便后面唤醒
  3. 将sudog入队到sendq发送等待队列
  4. 释放sudog这里不是很懂

chanrecv

没找到动图,就不贴了。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   lock(&c.lock)
    
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }    

   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
  1. lock
  2. 如果chan已经close,并且没有数据,就直接返回空值
  3. 如果有G正在等待发送数据,分情况
  4. 如果chan没有buf,就直接将G的数据发送给当前g
  5. 如果chan有buf, 就从buf头中取出数据G发送给当前g,并且唤醒G,然后把数据写入buf尾部

这里有两个g,一个是G表示等待发送数据的g,一个g表示接受chan数据的g。

这里ep对象有点意思,send中ep表示发送到的数据,揣测recv会从堆中取出数据,然后将ep指针指向这个堆数据。

    if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)

      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }

如果chan有数据,就直接从buf里面取出来,然后维护recvx和qcount

   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   
   mysg.elem = ep
   mysg.g = gp
   c.recvq.enqueue(mysg)

   releaseSudog(mysg)
   return true, !closed
}

以上情况都不满足,即chan没有数据,也没有等待发送的g。那就需要block当前接收的g

  1. 获取当前等待接收数据的g
  2. 获取sudog
  3. 用sudog封装g信息
  4. 将sudog入队到recvq等待接收队列
  5. 释放sudog
上一篇 下一篇

猜你喜欢

热点阅读