GO阅读-Sync包-Pool和Once

2023-03-26  本文已影响0人  温岭夹糕

环境

go1.20

1.Pool

1.1使用和注意事项

一般考虑缓存资源(复用内存,最主要是减少GC压力,减少CPU资源,因为内存分配和GC都是CPU密集操作),如创建好的对象,可以使用pool
Pool只有一个New成员对象暴露给外面,方法为Get和Put,分别对应是取和存操作:

1.2源码分析

几个关键的结构体

// type Pointer *ArbitraryType
// type ArbitraryType int
type Pool struct {
    noCopy noCopy
    local     unsafe.Pointer 
    localSize uintptr        
    victim     unsafe.Pointer 
    victimSize uintptr        
    New func() any
}
type poolLocal struct {
    poolLocalInternal
    // 不用在意这个,通过对齐操作解决伪共享,高缓是以CacheLine为单位进行缓存的,存在“伪共享”问题;
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

type poolLocalInternal struct {
    private interface{} // 保存一个处理器私有的临时对象,仅能被本地处理器访问
    shared  poolChain   // 并发安全的双向链表,用于保存临时对象,可被所有处理器P访问
}

需要注意的是private和shared的区别,前者只能保存一个
poolChain源码在poolqueue.go中,是一个链表/队列

//头结点和尾节点
type poolChain struct {
    head *poolChainElt
    tail *poolChainElt
}
//节点
//又见到了老朋友,典型的装饰器模式
type poolChainElt struct {
    poolDequeue

    next, prev *poolChainElt
}
func (c *poolChain) pushHead(val any)
func (c *poolChain) popHead() (any, bool)
func (c *poolChain) popTail() (any, bool)

pool的local([]poollocal)的创建在pin()函数中

返回p的[]localpool和id
func (p *Pool) pin() (*poolLocal, int) {
// procPin会将协程和对应的线程设置为不可抢占,防止协程被抢占
    pid := runtime_procPin()
    s := runtime_LoadAcquintptr(&p.localSize) 
    l := p.local                             
    if uintptr(pid) < s {
//返回p的id和p的poollocal地址
        return indexLocal(l, pid), pid
    }
    return p.pinSlow()
}

func (p *Pool) pinSlow() (*poolLocal, int) {
//一次procPin操作必须对应一次procUnpin操作
//即解除绑定,为什么先解除
//因为下面要加一个互斥锁更耗时,所以先释放
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()
    
    s := p.localSize
    l := p.local
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
// p的local不存在则创建
    if p.local == nil {
        allPools = append(allPools, p)
    }
//这个是获取处理器P的数量
    size := runtime.GOMAXPROCS(0)
//这行代码就可以看出是一个poolLocal的切片
//不是一个对象
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    runtime_StoreReluintptr(&p.localSize, uintptr(size))     
//但是返回的是一个本地p的poollocal地址
    return &local[pid], pid
}

pin方法在Get和Put中都有调用,通过阅读需要记住local保存着的是[]poollocal中的一个成员地址

1.2.1Get方法

Get源码

func (p *Pool) Get() any {
//防止执行Get时当前P被其他g抢占
    l, pid := p.pin()
//获取处理器的私有临时对象,poollocal.private
    x := l.private
    l.private = nil
    if x == nil {
//shared也保存着临时对象,
//private拿不到就从shared队列/链表中获取
        x, _ = l.shared.popHead()
        if x == nil {
//也没有就执行getslow方法
            x = p.getSlow(pid)
        }
    }
//pin函数要求必须调用这个函数进行清理
    runtime_procUnpin()
//getslow也拿不到就用New创建
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

getSlow又做了啥?

//返回一个poollocal地址
func indexLocal(l unsafe.Pointer, i int) *poolLocal

func (p *Pool) getSlow(pid int) any {
//获取[]poollocal长度和poollocal地址
    size := runtime_LoadAcquintptr(&p.localSize) 
    locals := p.local       
     //这里indexLocal输入的不是自己的pid,而是其他随机的                     
    // 所以尝试从其他处理器的shared中取偷取
     //偷到就返回,联想到GMP的饥饿模式
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
//popTail是弹出链表最后一个
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
//获取victim的链表长度
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
//indexLocal传入的是自己的p的id
// victim保存着上一轮的poollocal
// 从本地的victim的private中获取
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
//从victim的shared获取
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // victim没有对象获取时,将其长度清零
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}
小结获取顺序 image.png

1.2.2Put方法

没啥难度,跟Get反着来

func (p *Pool) Put(x any) {
//类型检测,nil不给放
    if x == nil {
        return
    }
//获取P的poollocal
//按顺序尝试放入private和shared中
    l, _ := p.pin()
    if l.private == nil {
        l.private = x
    } else {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()

}

1.2.2poolCleanUP方法

开头就说了GC时会清理pool,实际就是调用这个函数poolCleanUp

// 全局变量
var (
    allPoolsMu Mutex

    // allPools保存所有local非空的对象
    // allPools的并发控制手段有:allPoolsMu或STW
    allPools []*Pool

    // oldPools保存所有local为空、victim非空的对象
    // oldPools的并发控制手段有:STW
    oldPools []*Pool
)


func poolCleanup() {
//清空oldpools的victim
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

//清空allpools的local
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    //更换oldpool和allpool
    oldPools, allPools = allPools, nil
}

至于哪个处理器P上的poollocal被清理就不清楚了

2.Once

结构体很简单,一把锁和一个次数标记位

type Once struct {
    done uint32
    m    Mutex
}

Do方法不是立即执行函数

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}
func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

这里使用了对done的双重检查double-check,执行前检查一次,确保要执行“执行函数”这一步操作,在加锁后再确认一次,确保在等待拿锁的这段时间里,没被执行过

参考

1.Pool源码解析
2.golang sync.pool源码分析无锁队列实现

上一篇 下一篇

猜你喜欢

热点阅读