Go

调度器——GMP 调度模型

2021-07-18  本文已影响0人  沉睡的木木夕

调度器——GMP 调度模型

Goroutine 调度器,它是负责在工作线程上分发准备运行的 goroutines。

首先在讲 GMP 调度模型之前,我们先了解为什么会有这个模型,之前的调度模型是什么样子的?为什么要改成现在的模式?

我们从当初的Goroutine 调度设计文档得知之前采用了 GM 的调度模型,并且在高并发测试下性能不高。文中提到测试显示 Vtocc 服务器在 8 核机器上的CPU最高为70%,而文件显示 rutime.futex() 就消耗了14%。通常,在性能至关重要的情况下,调度器可能会禁止用户使用惯用的细粒度并发。

那么是什么原因导致这些问题呢?Dmitry Vyukov 总结四个原因:

为了解决这个问题,于是就引入了 Processor 这个概念。引入了这个对象并不会因为多了一个对象开销性能都会有影响,反而这方面开销都下降了。P 其实负责的是 M 与 G 之间的调度相关的操作,在执行 G 时 P 一定要与 M 绑定。并且把 M,schedule 里面的对象都转移到 P 中去了,所以 M 与 调度器原来的操作反而变得更干净了。如调度设计文档中提到的:当 M 准备执行 Go 代码时会从集合表中弹出一个 P;当执行代码结束后就会将 P 推进集合中。所以当 M 需要执行 Go 代码时,必须要与 P 绑定。而新增的这个机制,就是为了替代原来调度器中的 sched.atomic(mcpu/mcpumax)。

在设计文档中还讲到了调度时发生系统启动、挂起、以及恢复(Syscall Park/Unpark)的指导方针,并且在后面的调度实现提供了依据。

从创建 G 就必须要确保由其他的 M 在执行 G。同样当 M 进入系统调用时就必须要确保由其他的 M 来执行 Go 代码。

这时有两种选择,要么立即阻塞/解塞,要么自旋。关于自旋有两种级别:

  1. 空闲的 M 同一个相关联的 P 自旋一段时间来寻找新的 G
  2. 一个 M 与一个相关联的 P 自旋等待可用的 G

存在类型为上述 2 的空闲 M 时,类型为 1 的空闲 M 不会阻塞。当产生一个新 G 或 M 进入 Syscall 又或是 M 从空闲转位忙碌时,能确保至少有一个 M 可以执行。这样就避免了过多的 M 阻塞/解塞。

现在的调度模型主要分为三个概念:

P 必须要绑定到 M 上来执行具体的 Go 代码。

在讲 GMP 调度模型之前,我们先来了解以下 G、M、P 这三个对象有哪些核心变量。

G

Goroutine 是建立在 M 内核线程之上的称为协程的一个执行单元。在切换 G 时都是直接在用户态发生的,所以开销很小。所占用的内存也比原来小了很多,从前面的内容我们知道,我们把其中某些元素放入至新引入的 P 中了。虽然占用的内存不大,但是里面的变量却非常多。我们目前了解其中相对重要的部分,其它的字段想进一步了解,可以直接查看 runtime2.g 源码.

type g struct {
    stack       stack   // offset known to runtime/cgo
    stackguard0 uintptr // offset known to liblink
    stackguard1 uintptr // offset known to liblink
    ...
}
type g struct {
    ...
    m            *m
    sched        gobuf
    param        unsafe.Pointer
    atomicstatus uint32
    schedlink    guintptr
    gopc           uintptr
    startpc        uintptr
    waiting        *sudog
    ...
}
type g struct {
    ...
    preempt       bool // 抢占信号, 与 stackguard0 = stackpreempt 一样
    preemptStop   bool // 抢占状态更改为 _Gpreempted
    preemptShrink bool // shrink stack at synchronous safe point
    ...
}

这三个变量是跟抢占调度相关的。

M

M 是指操作系统线程,Go 在启动时会根据 CPU 的核心数分配 M 的个数。最多会开启 10000 个线程,并且这里面大多数都不会执行用户代码。最多只有 GOMAXPROCS 个活跃的线程执行用户代码。默认的设置一般都是 CPU 的核心数,这样是为了在调度的时候防止线程频繁的发生上下文切换。而在调度 G 的所有过程都是在用户态进行的,较于操作系统级的线程 M 切换来说开销会小的多。

我们来看一下 M 的主要核心对象:

type m struct {
    g0      *g
    curg          *g
    ...
}

g0 是个特殊的 goroutine,它是持有调度栈的,它会参与调度的过程。如创建 m,创建 g 以及执行一些内存分配。

type m struct {
    p             puintptr // attached p for executing go code (nil if not executing go code)
    nextp         puintptr
    oldp          puintptr // the p that was attached before executing a syscall
    ...
}

这三个字段是与 P 处理器相关的;

P

是新引入的在 G 和 M 之间的调度层。它负责调度 runq 等待队列中的待运行的协程,在关键的操作时候可以选择让出线程,提高线程利用率。

P 内部也包含了大量对象,同样我们主要了解其中相对重要的字段,与调度那些等待的 g 密切相关的内容。

type p struct {
    m           muintptr
    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    runnext guintptr
    ...
}

前面提到了在执行具体 go 代码时,p 一定要与 m 相关联。后续的字段都是与运行的 goroutine 相关。

其实从这四个字段我们就能看出,runq 其实一个由数组结构加双指针构成的一个环形队列结构

除此之外,还有一个匿名结构类型的字段需要注意,那就是 gFree。这个对象内部是由 gList、n 组成的一个链表对象,用来存放空闲 g 的。

gFree struct {
    gList
    n int32
}

启动 Schedule 调度器

在调度 GMP 之前我们必须还要知道调度器是如何启动的。

调度器启动在 runtime.schedinit 可以看得到。除去初始化锁的顺序信息和其它必要的信息(如gc、栈、系统参数与环境变量等),我们主要看下面几个变量:

func schedinit() {
    ...
    _g_ := getg()
    if raceenabled {
        _g_.racectx, raceprocctx0 = raceinit()
    }
    sched.maxmcount = 10000
    ...
    lock(&sched.lock)
    sched.lastpoll = uint64(nanotime())
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
    unlock(&sched.lock)
    ...
}

调度器最多只能开启 10000 个线程。如果设置了 GOMAXPROCS 则替换默认的 cpu 核心数。之后就会调用 procresize 对 proc 进行更改。这个时候调度器必须要上锁,不会执行任何 goroutine 代码。procresize 函数内部对全局变量 allp 的期望容量 capcity 与 procs 进行判断。如果目标值要比期望值大,则会进行扩容给。否则直接追加即可:

func procresize(nprocs int32) *p {
    if nprocs > int32(len(allp)) {
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        ...
        unlock(&allpLock)
    }
    ...
}

扩容之后就会循环初始化 p(初始化期间对 pp 的id、status 以及内存缓存赋值), 并调用底层系统的写屏障(write barrier)确保安全的对 allp 进行覆盖。

在初始化阶段,p 的状态此时是 _Pgcstop。在初始化之后如果当前的 p 的序号是小于之前设置的 nproc 目标数时,就会将当前的 g.m.p 的状态更改为 _Prunning。如果不满足上述条件,则会恒定取全局的 allp 中的第一个,并将状态设置为 _Pidle。

设置完当前的 g.m.p 信息之后就会对一些不再引用的对象进行清理、压缩以及将除 allp 集合中的第一个 p 之外将状态全部置为 _Pidle,并将其放入调度器 sched.pidle 全局空闲队列中去。

func procresize(nprocs int32) *p {
    ...
    mcache0 = nil
    // release resources from unused P's
    for i := nprocs; i < old; i++ {
        p := allp[i]
        p.destroy()
        // can't free P itself because it can be referenced by an M in syscall
    }
    // Trim allp.
    if int32(len(allp)) != nprocs {
        lock(&allpLock)
        allp = allp[:nprocs]
        idlepMask = idlepMask[:maskWords]
        timerpMask = timerpMask[:maskWords]
        unlock(&allpLock)
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        if runqempty(p) {
            pidleput(p)
        } else {
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    ...
    return runnablePs
}

小结

调度器启动总起来就是如下步骤:

新建 Goroutine

其实我们可以从一个例子着手,查看 go 是如何启动一个 goroutine 的

func startg() {
    go func() {
        fmt.Println("start g")
    }()
}

在启动 main.go 的时候,runtime 会执行 proc.go.main 方法创建主协程,并初始化一些信息以及 gc 相关的标识等操作。我们可以通过

go build -gcflag -S startg.go 命令能查看,编译器调用了 runtime.newproc(SB),这个方法有两个参数,一个是参数的大小,另一个是 goroutine 要执行的函数体。

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
        newg := newproc1(fn, argp, siz, gp, pc)
        _p_ := getg().m.p.ptr()
        runqput(_p_, newg, true)
        if mainStarted {
            wakep()
        }
    })
}

newproc 方法主要就是保存这两个参数的信息以及对应的程序计数器 pc。然后会根据这些变量来新生成一个 g,然后把这个新生成的 g 推送到当前 g 上的线程的处理器 p 的局部 runq 队列中,然后根据特定的条件(mainStarted)来决定是否唤醒。

newproc1 除了一些栈空间大小的判断以及参数、调度器的内存地址拷贝之外,主要执行了如下功能:

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
    _g_ := getg()
    ...
    acquirem()
    ...
    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    ...
    if narg > 0 {
        memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
        if writeBarrier.needed && !_g_.m.curg.gcscandone {
            f := findfunc(fn.fn)
            stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
            if stkmap.nbit > 0 {
                // We're in the prologue, so it's always stack map index 0.
                bv := stackmapdata(stkmap, 0)
                bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
            }
        }
    }
    ...
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg, false) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    casgstatus(newg, _Gdead, _Grunnable)
    ...
    releasem(_g_.m)
    return newg
}

上面的代码我省略了其它不在考虑的代码。在创建新 g 之前献给 m 上锁了防止被抢占,因为后续要对当前的 m 相关的 p 下的局部队列保存 g。

在创建 newg 的时候首先会调用 gfget(p) 从当前 p 下的 gFree 局部队列中获取空闲的 g(状态为 Gdead),如果局部队列中没有的话,就从调度器 sched 的全局队列中窃取空闲的 g。

如果发生了窃取,那么就会在第一次窃取时就把调度器 sched 中的空闲 g 的批次的全部窃取到自己的局部队列中直到局部队列满(n = 32)。

如果全局队列中也没有 g 的话。那么就会调用 malg(_StackMin) 根据传入的栈大小生成 newg。然后就会调用 memmove 指令将数据以及 fn 信息拷贝到栈上。最后就会将前面保存的栈指针以及 fn 程序计数器等信息保存在 newg 上,并更改 newg 状态由 _Gdead 转变为 _Grunnable。最后释放 g.m 并返回 newg。

新生成并返回的 newg 最终会由 runqput 推送至当前 p 下的队列中。

func runqput(_p_ *p, gp *g, next bool) {
    ...
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }
retry:
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

runqnext 会根据传入的 next 参数决定走两个分支:

runqputslow 在 p 的局部队列满的情况下,负责取出队列中的一部分以及待加入的新 g 添加到调度器的全局运行队列上。

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    var batch [len(_p_.runq)/2 + 1]*g
    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    ...
    batch[n] = gp
    // Link the goroutines.
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }
    ...
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])

    // Now put the batch on global queue.
    lock(&sched.lock)
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)
    return true
}

要注意,globrunqputbatch 在添加全局队列 sched.runq 前后是要加锁的防止并发修改这个共享的全局变量。

注意:关于 p 本地运行队列 runq 和调度器 sched 的运行队列 runq 同样都是链表,但是组成的结构完全不一样。

p 的 runq 是通过数组+双指针形成的环形队列。

sched 的 runq 就是单纯的链表结构

调度

在执行完 schedinit 之后就会调用创建 M 的入口函数 runtime.mstart,前者内部会调用 runtime.mstart1。前者主要初始化 g0 的 stackguard0 和 stackguard1 字段。后者会初始化线程 m 以及 m0 独有的逻辑(信号处理程序)。最后会开始调用 runtime.schedule 进行调度。

schedule 主要工作就是创建 g。但是创建 g 的过程非常复杂,在调度 g 之前进行了多次判断:

  1. 首先判断是否因为 gc 等待,如果是因为 gc 就等待 gc 结束。
  2. 判断是否执行安全点函数。

剩下就要针对各种情况对 g 进行赋值:

  1. 如果存在 gc 标记作业,那么就得去 gc 控制器中尝试获取可运行的 g。
  2. 3 没有成功获取,则通过一定的算法尽量公平的(魔法数字 61)通过 runtime.globrunqget 先尝试从全局可运行队列中获取 g,在返回的同时还会将全局队列中的待运行的 g 一次性取一份到当前 p 下的局部队列中。
  3. 如果全局队列中没有 g,则去当前线程下的 p 的 runq 队列中获取
  4. p 局部队列中也没有,那么就会同步调用 runtime.findrunnable 等待返回一个可运行的 g。
    • 这部分的功能非常复杂,总的来说就是去其他 P 的局部或全局运行队列中窃取可运行的 g
    • 轮训网络查看是否有 g 等待运行
    • 在返回可运行的 g 之前还会判断线程自旋是否超过了正在活跃的线程数,超过了就阻塞,来避免 CPU 的负担过大。
    • 在循环窃取的时候能还通过窃取计数器去传递窃取的次数
  5. 最终返回可运行的 g 去执行调度 runtime.execute

最后就会调用把获取的 g 并把其中的调度器信息传递给 runtime.gogo(asm-amd64)。在这个函数中会根据传递的调度信息中的 gobuf 来获取程序计数器 pc 与栈指针以及对应的上下文信息。这样就可以根据这些数据恢复到要执行的 fn 对应的地址来继续执行。

关于切换 g 然后根据获取的 pc、sp 以及 context 恢复在切换 g 之前的位置继续往下执行的过程,熟悉 c# async await 状态机切换函数调用的过程的同学可以将此等同(不负责任)

最后就会调用 runtime.goexit 退出,退出之后最终会在 g0 的栈上找到 runtime.goexit0 该函数,将 goroutine 的状态置为 _Gdead,并清理它的字段信息、取消与 m 的关系、移除 g 并调用 runtime.gfput 推送到当前 p 下的空闲列表 gFree 列表中。然后又重新调用 runtime.schedule 进行一下轮调度。

所以这个调度是一个不断循环上述的调度过程

注意:⚠️⚠️⚠️

关于 runtime.gogo 中最后会调用 runtime.goexit 的说法我参考了 dravenss《Go 语言程序与设计》中 6.5 调度器的说法,我在汇编代码中并没有找到这种说法的痕迹(也是因为没有看懂这里)。

至此,整个由调度器启动到创建初始化线程再到获取或创建 G 再将 P 与之要执行的线程关联最后到结束至下一轮调度循环的过程就分析完了。其实这里面还有很多被我忽略的细节,比如还有大量的 trace 的功能、以及每个对象的对于栈 size 的重计算、等待及等待队列 runtime.sudog 、自旋的细节都没有覆盖到。一是涉及到这些会让我了解并分析调度过程变得更加复杂,二是那些跟 GMP 调度关系不是很大。

小结

关于调度器的过程就是当一个新的 G 或已存在的 G 成为可运行状态时,它就会被推送至可运行的携程集合 runq 中。当 P 执行完 G 时就会从可运行 runq 集合中弹出 G 作为一下个要运行的 G。如果集合中没有就会去其它 P 下的 runq 队列中获取 G;如果还没有就去全局的 runq 链表中获取。一次窃取多个到自己 P 下的 runq 中。

但凡涉及到全局队列操作时都要上锁保证当前队列不被其他线程更改。

参考链接

上一篇下一篇

猜你喜欢

热点阅读