Golang 中 GPM 之 G 的执行(一、准备阶段)

2020-10-04  本文已影响0人  未tu

从之前的文章《Golang 中 GPM 之 G 从哪里来》,我们知道了 Golang 对于使用 G 是有一定的复用机制,这样减少了资源的浪费,接下来我们要看下 G 生成后会怎么样

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
  ...
  newg := gfget(_p_)
  if newg == nil {
    newg = malg(_StackMin)
    casgstatus(newg, _Gidle, _Gdead) //这里将新建的 G 中对应的状态是从 Gidle -> Gdead 中 
    ...
  }
  // 以上我们得到了这次新产生的 G 
  ...
  newg.startpc = fn.fn
  ...
  casgstatus(newg, _Gdead, _Grunnable) // 当新的 G 初始化,并将其状态变更  Gdead -> Grunnable
  // 并将新的 G 放到当前 G 运行的 M 对应的 P 中
  runqput(_p_, newg, true)
  ...
}

那接下来的重点就在 runqput(_p_ *p, gp *g, next bool)

// runqput 优先将 G 放在本地的可运行队列
// 如果 next 是 false ,则加入可运行队列的队列的尾部
// 如果 next 是 true ,则 runqput 将 G 放到当前 P 下一个执行的位置
// 如果当前 P 队列已经满了,则会将当前队列的 “一半+1” 放到 G 的全局队列中
func runqput(_p_ *p, gp *g, next bool) { 
  // 这里表示如果是随机的,则不将新产生的 G 放到当前 P 的 runnext
  if randomizeScheduler && next && fastrand()%2 == 0 {
    next = false
  }
​
  // 判断是否将新增的 G 放到当前 P 的 runnext,
  // 后续将被替换出来的 G 放到本地 G 队列的尾部
  if next {
  retryNext:
    oldnext := _p_.runnext
    if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
      goto retryNext
    }
    if oldnext == 0 {
      return
    }
​
    gp = oldnext.ptr()
  }
​
retry:
  // atomic.LoadAcq 获取上次 atomic.CaSRel 的 _p_.runqhead 的值
  h := atomic.LoadAcq(&_p_.runqhead) 
  t := _p_.runqtail 
  // 这里的 _p_.runq 的数据类型是[256]guintptr,说明总计一个 P 最多
  // 有 256 个 G 放在本地可运行队列中和 1 个接下来运行的 G(runnext)
  if t-h < uint32(len(_p_.runq)) { 
    _p_.runq[t%uint32(len(_p_.runq))].set(gp)//通过 % 说明这是一个循环队列
    atomic.StoreRel(&_p_.runqtail, t+1) //将末尾的位置 +1
    return
  }
  //如果队列满了,将当前 P 本地可运行队列的一半以及 oldnext 放到全局队列中
  if runqputslow(_p_, gp, h, t) {
    return
  }
​
  goto retry
}

runqputslow(_p_ *p, gp *g, h, t uint32) 的实现细节可以看下面

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
  //这里的长度取的是 p 本地可运行队列总长的一半 +1,即 256/2+1
  var batch [len(_p_.runq)/2 + 1]*g 
​
​
  n := t - h
  n = n / 2
  if n != uint32(len(_p_.runq)/2) {
    throw("runqputslow: queue is not full")
  }
  // 这个 for 循环次数就是可运行队列总量的一半
  for i := uint32(0); i < n; i++ {
    batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
  }
  
  // 这里根据 _p_.runqhead 判断是否还是老位置,
  // 如果是的话,则将起始位置增加 n ,如果失败则说明起始位置有变更过
  if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
    return false
  }
  batch[n] = gp
​
  if randomizeScheduler { // 这里只是将 batch 里面的 G 来进行随机排序
    for i := uint32(1); i <= n; i++ {
      j := fastrandn(i + 1)
      batch[i], batch[j] = batch[j], batch[i]
    }
  }
​
  // 这里的意思是将排序后的 batch 用链表的形式绑定起来
  for i := uint32(0); i < n; i++ {
    batch[i].schedlink.set(batch[i+1])
  }
  var q gQueue 
  q.head.set(batch[0])
  q.tail.set(batch[n])
  lock(&sched.lock)
  
  // 这里是将这个 batch 放到全局可运行队列(这使用了批量转移),
  // 且将 batch 放在全局可运行队列的尾部
  globrunqputbatch(&q, int32(n+1)) 
  
  unlock(&sched.lock)
  return true
}

总结:
第一步:生成一个状态为 _Grunnable 的新 G
第二步:将新 G 放到当前 G 相关的 P 的下一个运行位(即直接插队成下一个要执行的G,替换出来的 G 就成为 oldnext)
第三步:根据 oldnext 的情况
(1)如果 oldnext 本来就没有,则结束
(2)如果有,则接下去后面的步骤
第四步:根据当前 P 的本地可运行队列是否满了
(1)如果未满,则将 oldnext 放到本地可运行队列的后面
(2)如果满了,则将当前 P 本地可运行队列队列的一半以及当前的 oldnext 放到一起放到全局可运行队列的尾部

上一篇下一篇

猜你喜欢

热点阅读