Go 语言的 Channel - 源码分析
这部分看的是 golang1.2 的源码(其实是 3 年前看的,最近又拿出来复习整理了下),C语言实现的。
如果只是为了学习,而不是为了实际参与项目的开发,还是建议阅读低版本源代码,因为更纯粹些。
越是新的版本,里面掺杂的周边逻辑越多,比如 race
, debug
, profile
等等,这些周边功能就像打日志一样,到处都是,对阅读源码有较强的干扰。
基本原理
Channel 的内部主要结构挺简单的,本质上就是一个循环队列。如下图:
Channel 内部结构使用一个循环队列来实现channel buffer,如果是非buffer channel,那么这个队列长度为0.
dataqsiz
表示队列总长度,即cap(ch)
, dataq
表示 buffer 中元素的个数,即len(ch)
sendx
和recvx
分别表示队列的游标,表示队列中元素的头和尾。
channel 还拥有两个 goroutine 链表,sendq
和 recvq
;链表中存放的是因该 channel 阻塞住的 goroutine。
一个 goroutine 操作这个channel(发送或者读取),如果发生阻塞,都会放到相应的链表中;如果没发生阻塞,会尝试从相应的链表中唤醒一个goroutine。
以ch <- 1
为例子,流程大致如下:
- 检查channel是否能正常写入(即队列没有满),如果能则运行第2步, 否则执行第3步.
- 将数据写入buffer, 唤醒 recvq 中的一个 goroutine,并把 recvx 处的数据 copy 给刚唤醒的 goroutine.
- 把自己加到 sendq 中,然后 block. 该 goroutine 会等到后续的
<-ch
操作被叫醒.
var ch chan int
注意看下面代码的注释。
// 对goroutine的一个封装,专门给channel使用
struct SudoG
{
G* g;
SudoG* link;
byte* elem;
...
};
struct WaitQ // goroutine队列, 链表结构
{
SudoG* first;
SudoG* last;
};
struct Hchan // 我们的channel,各个成员都在注释中说明
{
uintgo qcount; // len(ch)
uintgo dataqsiz; // cap(ch)
uint16 elemsize; // 单个元素大小,因为我们声明的是 chan int 类型,这里就是 sizeof(int)
bool closed; // 是否关闭, 执行 close(ch) 后就关闭了
uintgo sendx; // send index for buffer
uintgo recvx; // receive index for buffer
WaitQ recvq; // 接收的goroutine队列,单链表
WaitQ sendq; // 发送的goroutine队列,单链表
Lock;
};
make(chan int, 10)
runtime·makechan_c(ChanType *t, int64 hint) {
// ....
c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0);
c->elemsize = elem->size;
c->dataqsiz = hint;
// ...
return c
}
我们可以看到,make一个channel,就是开辟了一块连续的内存空间,内存的大小就是Hchan结构本身 加上 channel buffer大小: sizeof(c) + hint*elem->size
ch <- 1
写入操作,这个函数很长,下面分成了几个分支来说明
当 ch 是 nil 时
channel是一个nil值, 会使 goroutine 阻塞住
if(c == nil) {
//...
runtime·park(nil, nil, "chan send (nil chan)");
return; // not reached
}
如果 ch 已经被 close 了
channel 已经关闭,panic
runtime·lock(c);
if(c->closed)
goto closed; // unlock an panic
// ....
closed:
runtime·unlock(c);
runtime·panicstring("send on closed channel");
如果 ch 是 buffer 的
处理带 buffer 的channel,仔细看注释
runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
if(c->dataqsiz > 0) // 带buffer 的 channel
goto asynch; // 异步
// ...
asynch:
if(c->closed)
goto closed;
if(c->qcount >= c->dataqsiz) { // buffer 满了
enqueue(&c->sendq, &mysg); // 放到sendq中
runtime·park(runtime·unlock, c, "chan send");// 阻塞
goto asynch; // 回去重新检查
}
// 把数据copy到buffer中
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz) // 修改队列游标
c->sendx = 0;
c->qcount++;
sg = dequeue(&c->recvq); // 把 recvq 里的找到一个goroutine
if(sg != nil) {
gp = sg->g;
runtime·ready(gp); // 唤醒
}
}
ch 不是 buffer 的
非 buffer channel 的处理方式
runtime·chansend(ChanType *t, Hchan *c, byte *ep, ...){
sg = dequeue(&c->recvq); // 从 recvq 获取一个等待该channel的goroutine
if(sg != nil) {
c->elemalg->copy(c->elemsize, sg->elem, ep); // 把值 copy 给这个 goroutine
runtime·ready(gp); // 唤醒
return;
}
enqueue(&c->sendq, &mysg); // 把自己放到发送队列 sendq 中
runtime·park(runtime·unlock, c, "chan send"); // 阻塞自己
}
v, ok := <-ch
和上面的过程几乎是一模一样,不重复了。唯独有点不同的就是对关闭 channel 的处理。从一个已关闭的 channel 中读取数据,是不会 panic 的,而是直接返回。
closed:
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, nil);
if(selected != nil) // 如果在select语句里
*selected = true;
if(received != nil) // ok 值
*received = false;
runtime·unlock(c);
select
结构
truct Scase
{
SudoG sg; // must be first member (cast to Scase)
Hchan* chan; // chan
byte* pc; // return pc
uint16 kind; // 类型, recv或send或default
bool* receivedp; // pointer to received bool (recv2)
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
uint16* pollorder; // case poll order
Hchan** lockorder; // channel lock order
Scase scase[1]; // one per case (in order of appearance)
}
pollorder
是用来遍历lockorder
的,为了实现乱序,把pollorder
设置成乱序的数组,然后用其值作为索引遍历lockeorder
。支持乱序的原因是,避免如果第一个 channel 总是有数据,那其他的 channel case 就永远没机会执行了。
乱序
select 中所有 case 如果都满足达到非阻塞条件,哪个会被执行是随机的。这个随机是在程序里故意实现的。下面是一个乱序算法。
for(i=0; i<sel->ncase; i++)
sel->pollorder[i] = i;
for(i=1; i<sel->ncase; i++) {
o = sel->pollorder[i];
j = runtime·fastrand1()%(i+1);
sel->pollorder[i] = sel->pollorder[j];
sel->pollorder[j] = o;
}
三次循环
Select的源代码函数里,有三个主要的循环。太长了这里不贴代码了。
- 循环检查(乱序遍历)所有 case 看是否有满足的channel,有就直接执行,然后return,否则执行第2步。
- 把 goroutine(也就是自己)加到所有 case 的 channel 的发送或接收队列中,然后阻塞,等待被叫醒。
- 被其中一个 case 的 channel 唤醒,把自己从其他所有 case 的 channel 的队列中删除,设置 PC 值,即被唤醒后进入哪个 case。
看着效率很差,但我们一般在代码中也写不了多少个 case,一般都是三五个,也不会导致性能下降。
close(ch)
关闭channel, 很简单。如果是 nil 或者已经被 close 了,直接 panic。
关闭后,会唤醒 recvq
和 sendq
两个链表中的所有 goroutine。
if(c == nil)// nil channel
runtime·panicstring("close of nil channel");
if(c->closed) { // closed channel
runtime·panicstring("close of closed channel");
}
c->closed = true;
// release all readers
for(;;) {
sg = dequeue(&c->recvq);
if(sg == nil)
break;
runtime·ready(gp);
}
// release all writers
for(;;) {
sg = dequeue(&c->sendq);
if(sg == nil)
break;
runtime·ready(gp);
}
sendq
里的 goroutine 一旦被唤醒,就会 panic,因为它在尝试向一个关闭 channel 发数据。所以在参数传递时,会把 channel 做类型转换,声明下它是消费者还是生产者,即 <-chan Type
和 chan<- Type
2 种类型。避免消费者随意关闭 channel 导致生产者 panic。
nil channel 的用途
通常用来暂时屏蔽一个 channel,比如:
var done <-chan struct{} // 初始值是 nil
for {
select {
case <-done:
return
case <-input:
// 代码逻辑
if canReturn {
done = ctx.Done()
} else {
done = nil
}
}
}
代码中,虽然要被 context 来控制退出,但如果存在某些特殊状态,不允许被终止。就要用到 nil channel 了。
总结
非 buffer 的 channel 比 buffer channel 少了一次内存 copy。但非 buffer channel 工作起来基本就是相当于个互斥锁,会让 goroutine 无法并行,在多核机器上会导致程序的处理效率很差(即最大并发量很低,机器的 CPU 利用率低)。所以如果 channel 是一个常驻型的,直接make 一个大一点的 buffer channel 没关系。
通过 close channel 做简单的广播通知,这个很常用,官方 context 库也是这么做的。并不一定要用 context 库去通知,一个非 buffer 的 chan struct
变量就够。
channel 也是一种数据类型,即使没有 close 也是可以被 GC 的,没必要去做特殊的管理。