go context包源码分析
context包以及包内方法用以维护一组goroutine间的生命周期的截止,以及同生命周期内的共享变量
本文面向有一定go基础的同学
如有概念错误欢迎拍砖 openex@qq.com
arch: amd64
version:go1.10
数据结构上以一种单链表方法维护,由子节点指向父节点,同一父节点可以被多个子节点指向.最终指向一个空节点(emptyCtx),通常由c.Background()提供(或context.TODO()) cancelCtx类型的context结构内含有一个hashmap维护该节点被哪些子节点指向,以便于结束信号的传递.生命周期是否已结束的判定条件通过内部的channel结构实现,主要使用了读closed状态channel时将返回类型零值的特性.
我画了一个使用时可能出现的结构图,具体的类型含义将在后文中解释
一种程序中的context结构
0.Context接口
type Context interface {
//Deadline()返回该context是否有截止时间(timerCtx),如果有什么时候(time.Time)
Deadline() (deadline time.Time, ok bool)
//Done() 返回一个只读的channel,使用者通过从此channel读到一个值得知context已结束
Done() <-chan struct{}
//Err()返回context由什么原因结束.手动结束?超过截止时间?
Err() error
//Value(...)根据提供的key在context中遍历是否有这个key,如果有则返回其value否则返回nil
Value(key interface{}) interface{}
}
程序员可以自己实现这个接口,参与到context中,比如你想维护一个更加复杂的context节点,但是由不想自己去维护生命周期或者想加入到某个context树中
1.私有结构emptyCtx
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
细心地你一定发现了我画的图中,最终都指向了emptyCtx,这个emptyCtx是使用context包的一种常规方式,因为稍后你会发现,包内方法总是让提供一个父级context用以关联.当然你可以自己去实现一个context,但是这个empty的context通常是符合需求或规约的.
这个实现使用了几个技巧:
- Deadline()内直接return将会使在函数初始化时被置为零值的多个返回值直接返回给调用方
- Done() 读一个nil的channel会永远阻塞, 而使用时也是会通过读这个channel有无值判定context是否结束
- Value() 因为某个生命周期组是一个单链表结构,寻找value时会一直向上遍历,若用emptyCtx作为终点则符合"找不到返回nil"
2.私有结构cancelCtx
var closedchan = make(chan struct{})
func init() {
close(closedchan)
}
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
cancelCtx是实现了Context接口的一个结构,他是实现生命周期概念的主要结构.
我们先看看结构定义
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
其中的第一个Field是Context一种嵌入类型的用法,关于go的嵌入类型实现细节本文不描述,你需要知道的是cancelCtx把Context接口这个Field传进来的具体实现类型的接口相关方法"继承"了下来,当然也可以自行定义该类型的同名方法进行"重载".(这块说的比较绕,如果不理解可以自己编码理解下)同时,后边会介绍context包内方法,其中是将父context绑定到这个Context Field上,实际上是一种类似单链表的绑定关系,由子指向父
*我为你提供了一个Demo,这里就不再多说细节,未来会写个博文进行源码分析
package main
import "fmt"
type x struct{}
func (_ *x) Done()int {return 1}
func (_ *x) Fix() int {return 2}
type aa interface {
Done() int
}
type n struct{
aa
}
func (_ *n) Done() int{return 3}
func main(){
a := n{
&x{},
}
fmt.Println(a.Done())
//fmt.Println(a.Fix())
}
回过头来,我们继续分析cancelCtx的结构,其中的mu和err 一个是用来保护Field读写操作的互斥锁,err是描述context生命周期结束的原因.
done是一个空类型channel(空类型struct{}是一种编码优化方式,编译器会把这种正文都编译成指向一个程序初始化时一个机器字长大小的不变量上,而关于这个类型相关的move操作长度均为0),调用方间接通过这个channel能否读出值判断生命周期是否结束,如果读出来代表生命周期已经结束(我们很难知道调用方会读取多少次,所以不能事前分析调用数量向channel写值,这里使用了channel的一个特性:读取被关闭的channel将立刻返回channel类型的零值)
children是一个hashmap,key为实现canceler接口的实体,value为空类型.这个filed的作用是:当本身或父级context传来结束生命周期信号时(调用了自身的cancel方法),通过这个map寻找所有指向本节点的子节点,并调用他们的cancel方法.
思考:为什么使用这种hashmap结构维护children关系?而不是使用线性结构?
我觉得这里主要的作用是做去重,因为重复的节点去进行cancel虽然幂等但无意义浪费指令,而使用线性表进行去重由可能有O(n)的开销(如果扇出很少也未尝不可),因为是一个开源的包,作者不知道使用者的场景,而目前版本的hashmap设计在插入和删除操作上也做的比较平衡,做为折中使用这种比较均衡的方法.
从这个思考也想到一个事,很多的开源包虽然性能很NICE和稳定,但是作者不了解用户的具体场景,同时又想照顾更多的用户,所以难免会有一些折中和妥协,所以在具体的软件开发工作中,针对自己的场景进行特定开发可能会有更大的收益(考虑Amdahl定律的情况下)
扯远了,回来我们来看看cancel的方法
Done()方法,返回了一个只读的channel,实现上会先判断绑定的cancelCtx done成员是否为nil如果是则给他make一个channel,否则返回这个channel,这个done是惰性分配的,因为是为了需要判断生命周期时才会用到Done()方法,进而用到done成员,若从不使用就不需要浪费内存了
Err() String()方法比较简单,自己扫一下就可以
cancel(removeFromParent bool, err error)方法,主要功能上是要关闭cancelCtx.done(如果原来是nil就把一个初始化时主动关闭的一个closechan传入),然后通过遍历cancelCtx.children 调用他们的cancel方法,传递这个生命周期关闭的"信号",最后根据removeFromParent字段判断是否需要让自己从父context的map中删除.
3.包内方法WithCancel
type CancelFunc func()
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c) //尝试把c与父节点绑定(写入父节点的hashmap)
return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() { //当父context为不识别的或emptyctx时创建独立goroutine维护生命周期
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
WithCancel方法返回一个封装后的cancelCtx和绑定的关闭函数
我们先看看parentCancelCtx函数
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
逻辑是以传入的context为起点向根据当前context的类型向上寻找cancelCtx,若没找到则会返回false
propagateCancel函数
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() { //当父context为不识别的或emptyctx时创建独立goroutine维护生命周期
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
这个函数会先寻找含当前context向上的第一个cancelCtx,然后把自己添加到它的children中(若children为nil则先初始化).若没找到cancelCtx则该节点的父节点链路中没有cancelCtx或者是用户自行设计的context结构,所以这块是启动了一个goroutine来监听父context的Done信号
removeChild函数,是先寻找父cancelCtx若有,则删除其children中和参数相关的key
4.私有结构timerCtx
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, time.Until(c.deadline))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
timerCtx结构内嵌入和cancelCtx,重载了cancel和String方法,这个结构主要用以维护一个有确定截止时间的生命周期,过期后自动cancel
5.包内方法WithDeadline 和 WithTimeout
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
WithDeadline返回一个 可能的初始化一个timerCtx和绑定的关闭函数,为什么说是可能呢,因为逻辑中会先判断父Context链中是否有timerCtx并判断它的终止时间是不是比当前传入的d还要早,若是则直接初始化一个cancelCtx返回.继续向下看逻辑,会判断一下传入的截止时间是否已经到期,若果是则直接cancel掉.若流程继续,则会调用time.AfterFunc 超时后自动关闭context
WithTimeout是对WithDeadline的一个封装
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
6.valueCtx和WithValue
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
valueCtx嵌入了一个Context,然后增加了两个成员key和val,再观察下Value方法可以看到c.Context.Value(key),这个操作,实际上就是一种链表的遍历了,还记得最开始讲的emptyCtx吗,它的Value方法会返回一个nil,正如文件里所说,nil的含义在这块就理解成没有找到成员了.
值得注意的是WithValue对valueCtx初始化时,会调用一次!reflect.TypeOf(key).Comparable(),也就是说要判断下传进来的key是否有可以进行比较,若不能进行比较则会panic,同样key为nil时也会panic,所以这块的约束比较严格,就需要使用者对这k
7.DEMO
package main
import (
"context"
"fmt"
"time"
)
type testStr string
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
producerChan := make(chan int, 10)
go func() {
for {
producerChan <- 1
time.Sleep(100 * time.Millisecond)
}
}()
var (
k1 testStr = "key1"
k2 testStr = "key2"
)
ctxV1 := context.WithValue(ctx, k1, "value1")
ctxV2 := context.WithValue(ctxV1, k2, "value2")
go func() {
done := ctxV2.Done()
haveKey := testStr("key2")
fmt.Printf("key=key2 value=%v\n", ctxV2.Value(haveKey))
for {
select {
case <-done:
fmt.Println(ctxV2.Err())
return
case v := <-producerChan:
fmt.Println(v)
}
}
}()
time.Sleep(3 * time.Second)
cancel()
time.Sleep(time.Second)
}
对于Done()方法的使用,我看过很多源码中使用的很不正确经常出现如下代码:
错误
for {
select {
case <-ctxV2.Done():
...
}
}
我一开始以为编译器前端会把这种代码自动优化成
done := ctxV2.Done()
for {
select {
case <-done:
...
}
}
然而实际的测试结果并不是这样,若写成错误的那种代码,若select被循环调用的很频繁那Done()也可能会很频繁的被调用,回顾下上文Done的实现,有抢锁等操作很没有必要的开销