Go 语言的 Channel - 源码分析

2018-12-15  本文已影响0人  达菲格

这部分看的是 golang1.2 的源码(其实是 3 年前看的,最近又拿出来复习整理了下),C语言实现的。

如果只是为了学习,而不是为了实际参与项目的开发,还是建议阅读低版本源代码,因为更纯粹些。

越是新的版本,里面掺杂的周边逻辑越多,比如 race, debug, profile 等等,这些周边功能就像打日志一样,到处都是,对阅读源码有较强的干扰。

基本原理

Channel 的内部主要结构挺简单的,本质上就是一个循环队列。如下图:

Channel 内部结构

使用一个循环队列来实现channel buffer,如果是非buffer channel,那么这个队列长度为0.

dataqsiz表示队列总长度,即cap(ch), dataq表示 buffer 中元素的个数,即len(ch)

sendxrecvx分别表示队列的游标,表示队列中元素的头和尾。

channel 还拥有两个 goroutine 链表,sendqrecvq;链表中存放的是因该 channel 阻塞住的 goroutine。
一个 goroutine 操作这个channel(发送或者读取),如果发生阻塞,都会放到相应的链表中;如果没发生阻塞,会尝试从相应的链表中唤醒一个goroutine。

ch <- 1为例子,流程大致如下:

  1. 检查channel是否能正常写入(即队列没有满),如果能则运行第2步, 否则执行第3步.
  2. 将数据写入buffer, 唤醒 recvq 中的一个 goroutine,并把 recvx 处的数据 copy 给刚唤醒的 goroutine.
  3. 把自己加到 sendq 中,然后 block. 该 goroutine 会等到后续的 <-ch 操作被叫醒.

var ch chan int

注意看下面代码的注释。

// 对goroutine的一个封装,专门给channel使用
struct  SudoG 
{
        G*      g;
        SudoG*  link;
        byte*   elem;
        ...
};
struct  WaitQ   // goroutine队列, 链表结构
{
        SudoG*  first;
        SudoG*  last;
};

struct  Hchan // 我们的channel,各个成员都在注释中说明
{
        uintgo  qcount;    // len(ch)
        uintgo  dataqsiz;  // cap(ch)
        uint16  elemsize;  // 单个元素大小,因为我们声明的是 chan int 类型,这里就是 sizeof(int)
        bool    closed;    // 是否关闭, 执行 close(ch) 后就关闭了
        uintgo  sendx;     // send index for buffer
        uintgo  recvx;     // receive index for buffer
        WaitQ   recvq;     // 接收的goroutine队列,单链表
        WaitQ   sendq;     // 发送的goroutine队列,单链表
        Lock;
};

make(chan int, 10)

runtime·makechan_c(ChanType *t, int64 hint) {
    // ....
        c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0);
        c->elemsize = elem->size;
        c->dataqsiz = hint;
        // ...
        return c
}

我们可以看到,make一个channel,就是开辟了一块连续的内存空间,内存的大小就是Hchan结构本身 加上 channel buffer大小: sizeof(c) + hint*elem->size

ch <- 1

写入操作,这个函数很长,下面分成了几个分支来说明

当 ch 是 nil 时

channel是一个nil值, 会使 goroutine 阻塞住

if(c == nil) {
    //...
    runtime·park(nil, nil, "chan send (nil chan)");
    return;  // not reached
}

如果 ch 已经被 close 了

channel 已经关闭,panic

        runtime·lock(c);
        if(c->closed)
                goto closed; // unlock an panic
        // ....
closed:
        runtime·unlock(c);
        runtime·panicstring("send on closed channel");

如果 ch 是 buffer 的

处理带 buffer 的channel,仔细看注释

runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
        if(c->dataqsiz > 0) // 带buffer 的 channel
                goto asynch; // 异步
        // ...
asynch:
        if(c->closed)
                goto closed;
        if(c->qcount >= c->dataqsiz) { // buffer 满了
                enqueue(&c->sendq, &mysg); // 放到sendq中
                runtime·park(runtime·unlock, c, "chan send");// 阻塞
                goto asynch; // 回去重新检查
        }

       // 把数据copy到buffer中
        c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
        if(++c->sendx == c->dataqsiz) // 修改队列游标
                c->sendx = 0;
        c->qcount++;

        sg = dequeue(&c->recvq); // 把 recvq 里的找到一个goroutine
        if(sg != nil) {
                gp = sg->g;
                runtime·ready(gp); // 唤醒
        }
}

ch 不是 buffer 的

非 buffer channel 的处理方式

runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
        sg = dequeue(&c->recvq); // 从 recvq 获取一个等待该channel的goroutine
        if(sg != nil) {
                c->elemalg->copy(c->elemsize, sg->elem, ep); // 把值 copy 给这个 goroutine
                runtime·ready(gp); // 唤醒
                return;
        }
        enqueue(&c->sendq, &mysg); // 把自己放到发送队列 sendq 中
        runtime·park(runtime·unlock, c, "chan send"); // 阻塞自己
}

v, ok := <-ch

和上面的过程几乎是一模一样,不重复了。唯独有点不同的就是对关闭 channel 的处理。从一个已关闭的 channel 中读取数据,是不会 panic 的,而是直接返回。

closed:
        if(ep != nil)
                c->elemalg->copy(c->elemsize, ep, nil);
        if(selected != nil) // 如果在select语句里
                *selected = true;
        if(received != nil) // ok 值
                *received = false;
        runtime·unlock(c);

select

结构

truct   Scase
{
    SudoG   sg;         // must be first member (cast to Scase)
    Hchan*  chan;           // chan
    byte*   pc;         // return pc
    uint16  kind; // 类型, recv或send或default
    bool*   receivedp;      // pointer to received bool (recv2)
};

struct  Select
{
    uint16  tcase;          // total count of scase[]
    uint16  ncase;          // currently filled scase[]
    uint16* pollorder;      // case poll order
    Hchan** lockorder;      // channel lock order
    Scase   scase[1];       // one per case (in order of appearance)
}

pollorder是用来遍历lockorder的,为了实现乱序,把pollorder设置成乱序的数组,然后用其值作为索引遍历lockeorder。支持乱序的原因是,避免如果第一个 channel 总是有数据,那其他的 channel case 就永远没机会执行了。

乱序

select 中所有 case 如果都满足达到非阻塞条件,哪个会被执行是随机的。这个随机是在程序里故意实现的。下面是一个乱序算法。

for(i=0; i<sel->ncase; i++)
        sel->pollorder[i] = i;
for(i=1; i<sel->ncase; i++) {
        o = sel->pollorder[i];
        j = runtime·fastrand1()%(i+1);
        sel->pollorder[i] = sel->pollorder[j];
        sel->pollorder[j] = o;
}

三次循环

Select的源代码函数里,有三个主要的循环。太长了这里不贴代码了。

  1. 循环检查(乱序遍历)所有 case 看是否有满足的channel,有就直接执行,然后return,否则执行第2步。
  2. 把 goroutine(也就是自己)加到所有 case 的 channel 的发送或接收队列中,然后阻塞,等待被叫醒。
  3. 被其中一个 case 的 channel 唤醒,把自己从其他所有 case 的 channel 的队列中删除,设置 PC 值,即被唤醒后进入哪个 case。

看着效率很差,但我们一般在代码中也写不了多少个 case,一般都是三五个,也不会导致性能下降。

close(ch)

关闭channel, 很简单。如果是 nil 或者已经被 close 了,直接 panic。

关闭后,会唤醒 recvqsendq 两个链表中的所有 goroutine。

if(c == nil)// nil channel
        runtime·panicstring("close of nil channel");
if(c->closed) { // closed channel
        runtime·panicstring("close of closed channel");
}

c->closed = true;

// release all readers
for(;;) {
        sg = dequeue(&c->recvq);
        if(sg == nil)
                break;
        runtime·ready(gp);
}

// release all writers
for(;;) {
        sg = dequeue(&c->sendq);
        if(sg == nil)
                break;
        runtime·ready(gp);
}

sendq 里的 goroutine 一旦被唤醒,就会 panic,因为它在尝试向一个关闭 channel 发数据。所以在参数传递时,会把 channel 做类型转换,声明下它是消费者还是生产者,即 <-chan Typechan<- Type 2 种类型。避免消费者随意关闭 channel 导致生产者 panic。

nil channel 的用途

通常用来暂时屏蔽一个 channel,比如:


var done <-chan struct{} // 初始值是 nil

for {
  select {
    case <-done:
        return
    case <-input:
      // 代码逻辑
      if canReturn {
        done = ctx.Done()
      } else {
        done = nil
      }
  }
}

代码中,虽然要被 context 来控制退出,但如果存在某些特殊状态,不允许被终止。就要用到 nil channel 了。

总结

非 buffer 的 channel 比 buffer channel 少了一次内存 copy。但非 buffer channel 工作起来基本就是相当于个互斥锁,会让 goroutine 无法并行,在多核机器上会导致程序的处理效率很差(即最大并发量很低,机器的 CPU 利用率低)。所以如果 channel 是一个常驻型的,直接make 一个大一点的 buffer channel 没关系。

通过 close channel 做简单的广播通知,这个很常用,官方 context 库也是这么做的。并不一定要用 context 库去通知,一个非 buffer 的 chan struct 变量就够。

channel 也是一种数据类型,即使没有 close 也是可以被 GC 的,没必要去做特殊的管理。

上一篇下一篇

猜你喜欢

热点阅读