GO阅读-Sync包-Pool和Once
2023-03-26 本文已影响0人
温岭夹糕
环境
1.Pool
1.1使用和注意事项
一般考虑缓存资源(复用内存,最主要是减少GC压力,减少CPU资源,因为内存分配和GC都是CPU密集操作),如创建好的对象,可以使用pool
Pool只有一个New成员对象暴露给外面,方法为Get和Put,分别对应是取和存操作:
- Get检查自己是否有资源,有则直接返回,没有就执行New里面的方法创建新的
- Put就是放入
- 需要注意的一点是sync.Pool会在GC(垃圾回收)的时候释放资源,也就是说经过一次gc后,pool里面到底还存在多少个是不确定的,无法控制的
- 当Pool里面存放结构体实例时,建议给实例增加一个reset重置方法,避免下次被拿到时是不干净的,出现未知结果
1.2源码分析
// type Pointer *ArbitraryType
// type ArbitraryType int
type Pool struct {
noCopy noCopy
local unsafe.Pointer
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
New func() any
}
- noCopy是给编译器看的,防止pool被值复制,即a=pool,a=b;
- local是一个指向[]poolLocal类型指针(实际上是其中一个poollocal的地址),用于保存临时对象,一个处理器P对应一个poolLocal,一般来说有几个处理器,[]poollocal就存放几个poollocal
- localSize顾名思义,[]poollocal的长度
- victim 也是[]poolLocal类型的指针,pool会定期调用poolcleanup函数进行清理操作,victim保存上一轮的local指针地址
- New为新对象的创建方法
type poolLocal struct {
poolLocalInternal
// 不用在意这个,通过对齐操作解决伪共享,高缓是以CacheLine为单位进行缓存的,存在“伪共享”问题;
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{} // 保存一个处理器私有的临时对象,仅能被本地处理器访问
shared poolChain // 并发安全的双向链表,用于保存临时对象,可被所有处理器P访问
}
需要注意的是private和shared的区别,前者只能保存一个
poolChain源码在poolqueue.go中,是一个链表/队列
//头结点和尾节点
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
//节点
//又见到了老朋友,典型的装饰器模式
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
func (c *poolChain) pushHead(val any)
func (c *poolChain) popHead() (any, bool)
func (c *poolChain) popTail() (any, bool)
pool的local([]poollocal)的创建在pin()函数中
返回p的[]localpool和id
func (p *Pool) pin() (*poolLocal, int) {
// procPin会将协程和对应的线程设置为不可抢占,防止协程被抢占
pid := runtime_procPin()
s := runtime_LoadAcquintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
//返回p的id和p的poollocal地址
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func (p *Pool) pinSlow() (*poolLocal, int) {
//一次procPin操作必须对应一次procUnpin操作
//即解除绑定,为什么先解除
//因为下面要加一个互斥锁更耗时,所以先释放
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// p的local不存在则创建
if p.local == nil {
allPools = append(allPools, p)
}
//这个是获取处理器P的数量
size := runtime.GOMAXPROCS(0)
//这行代码就可以看出是一个poolLocal的切片
//不是一个对象
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size))
//但是返回的是一个本地p的poollocal地址
return &local[pid], pid
}
pin方法在Get和Put中都有调用,通过阅读需要记住local保存着的是[]poollocal中的一个成员地址
1.2.1Get方法
func (p *Pool) Get() any {
//防止执行Get时当前P被其他g抢占
l, pid := p.pin()
//获取处理器的私有临时对象,poollocal.private
x := l.private
l.private = nil
if x == nil {
//shared也保存着临时对象,
//private拿不到就从shared队列/链表中获取
x, _ = l.shared.popHead()
if x == nil {
//也没有就执行getslow方法
x = p.getSlow(pid)
}
}
//pin函数要求必须调用这个函数进行清理
runtime_procUnpin()
//getslow也拿不到就用New创建
if x == nil && p.New != nil {
x = p.New()
}
return x
}
//返回一个poollocal地址
func indexLocal(l unsafe.Pointer, i int) *poolLocal
func (p *Pool) getSlow(pid int) any {
//获取[]poollocal长度和poollocal地址
size := runtime_LoadAcquintptr(&p.localSize)
locals := p.local
//这里indexLocal输入的不是自己的pid,而是其他随机的
// 所以尝试从其他处理器的shared中取偷取
//偷到就返回,联想到GMP的饥饿模式
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
//popTail是弹出链表最后一个
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
//获取victim的链表长度
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
//indexLocal传入的是自己的p的id
// victim保存着上一轮的poollocal
// 从本地的victim的private中获取
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
//从victim的shared获取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// victim没有对象获取时,将其长度清零
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
小结获取顺序
image.png
1.2.2Put方法
没啥难度,跟Get反着来
func (p *Pool) Put(x any) {
//类型检测,nil不给放
if x == nil {
return
}
//获取P的poollocal
//按顺序尝试放入private和shared中
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
l.shared.pushHead(x)
}
runtime_procUnpin()
}
1.2.2poolCleanUP方法
开头就说了GC时会清理pool,实际就是调用这个函数poolCleanUp
// 全局变量
var (
allPoolsMu Mutex
// allPools保存所有local非空的对象
// allPools的并发控制手段有:allPoolsMu或STW
allPools []*Pool
// oldPools保存所有local为空、victim非空的对象
// oldPools的并发控制手段有:STW
oldPools []*Pool
)
func poolCleanup() {
//清空oldpools的victim
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
//清空allpools的local
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
//更换oldpool和allpool
oldPools, allPools = allPools, nil
}
至于哪个处理器P上的poollocal被清理就不清楚了
2.Once
结构体很简单,一把锁和一个次数标记位
type Once struct {
done uint32
m Mutex
}
Do方法不是立即执行函数
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
这里使用了对done的双重检查double-check,执行前检查一次,确保要执行“执行函数”这一步操作,在加锁后再确认一次,确保在等待拿锁的这段时间里,没被执行过