Kotlin

庖丁解牛,一文搞懂Kotlin协程的挂起和恢复

2023-08-31  本文已影响0人  Android程序员老鸦
通过上篇文章大致理解了协程框架是怎么运行的,知道了作用域CoroutinScope,上下文CoroutinContext,续体Continuation,拦截器CorotineDispatcher,协程线程池CoroutineScheduler这些构成协程运行的基本要素,通过源码跟踪知道协程就是通过续体的不断交互和拦截器线程池的调度达到切换线程运行的目的,但这显然并不是协程的精髓,协程真正的精髓在于让我们可以写同步代码一样实现异步操作,而不需要用传统的回调接口。

没错,就是它赖以成名的挂起和恢复机制,这个机制才是协程的精华和实用性的代表。

协程的挂起和恢复

所谓协程的挂起和恢复,我认为可以这样描述:协程的挂起就是让运行在A线程的协程1可以切换到B线程(实际开启了新协程2)去执行一段时间(通常是耗时操作),此时外面的协程处于挂起状态(线程A没有阻塞等待,而是该干嘛干嘛),等协程2执行完后(通常还带有返回结果)恢复(实际上是主动通知协程1该恢复了)到协程1里继续执行,此时的代码编写形式完全不需要回调接口的参与,都是同步写在协程体里的。
这种形式的写法避免了以前回调地狱的问题,但是本质上还是线程切换,只是写法上更人性化更优雅。
很多人说这样更具性能,在协程数量不多的情况下,其实不见得更具性能。协程可以看做是一个基于线程的框架实现,在我看来,协程的最大亮点就是共享了线程池和挂起恢复机制。
举个例子吧:

GlobalScope.launch(Dispatchers.Main){
            println("launch start in Thread:${Thread.currentThread().name}")
            // 挂起点,主要是withContext函数实现了挂起和恢复
            val withResult = withContext(Dispatchers.IO){
                println("withContext run in Thread:${Thread.currentThread().name}")
                "withContext success"
            }
            // 恢复点
            println("launch end  in Thread:${Thread.currentThread().name}, withResult = $withResult")
        }

执行结果:
2023-08-31 18:00:34.193 4994-4994/com.example.coroutinexe I/System.out: launch start in Thread:main
2023-08-31 18:00:34.196 4994-6990/com.example.coroutinexe I/System.out: withContext run in Thread:DefaultDispatcher-worker-1
2023-08-31 18:00:34.199 4994-4994/com.example.coroutinexe I/System.out: launch end  in Thread:main, withResult = withContext success

以上例子很好地诠释了协程的挂起和恢复机制,没有回调接口,都是以串行的顺序执行代码,非常容易阅读。接下来就跟踪一下withContext函数看看它是怎么实现协程的挂起和恢复的。
继续上篇分析到的BaseContinuationImpl的resumeWith(result: Result<Any?>)方法,那里是真正执行invokeSuspend()也就是我们协程体的地方:

BaseContinuationImpl:
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) { // 循环,主要是循环执行invokeSuspend,根据该方法里的lable状态执行不同的挂起函数
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 执行协程体
                        val outcome = invokeSuspend(param)
                        // 重点关注这里如果返回值是COROUTINE_SUSPENDED,则跳出循环,也就是不再执行invokeSuspend方法,其实这就是挂起点了
                        // 那就看看什么时候invokeSuspend会返回COROUTINE_SUSPENDED呢,这就需要往invokeSuspend追溯
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

invokeSuspend方法:

           int label;
            /*
             * Unable to fully structure code
             * Enabled aggressive block sorting
             * Lifted jumps to return sites
             */
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1_1) {
                var3_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)var1_1);
                        System.out.println((Object)Intrinsics.stringPlus((String)"launch start in Thread:", (Object)Thread.currentThread().getName()));
                        this.label = 1;
                        // 重点看这里,这个时候的withContext方法返回的不是我们定义里面的block里定义的"withContext success",注意别搞混,这就要去看看withContext的实现
                        v0 = BuildersKt.withContext((CoroutineContext)((CoroutineContext)Dispatchers.getIO()), (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super String>, Object>(null){
                            int label;

                            @Nullable
                            public final Object invokeSuspend(@NotNull Object object) {
                                IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch (this.label) {
                                    case 0: {
                                        ResultKt.throwOnFailure((Object)object);
                                        System.out.println((Object)Intrinsics.stringPlus((String)"withContext run in Thread:", (Object)Thread.currentThread().getName()));
                                        return "withContext success";
                                    }
                                }
                                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                            }

                            @NotNull
                            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                                return (Continuation)new /* invalid duplicate definition of identical inner class */;
                            }

                            @Nullable
                            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super String> p2) {
                                return (this.create((Object)p1, p2)).invokeSuspend((Object)Unit.INSTANCE);
                            }
                        }), (Continuation)((Continuation)this));
                        // 如果返回值等于COROUTINE_SUSPENDED,就return COROUTINE_SUSPENDED,这不就巧了,正是上面resumeWith分析到的情况
                        if (v0 == var3_2) {
                            return var3_2;
                        }
                        ** GOTO lbl14
                    }
                    case 1: {
                        ResultKt.throwOnFailure((Object)$result);
                        v0 = $result;
lbl14:
                        // 2 sources

                        withResult = (String)v0;
                        System.out.println((Object)("launch end  in Thread:" + Thread.currentThread().getName() + ", withResult = " + withResult));
                        return Unit.INSTANCE;
                    }
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
协程的挂起
// context参数一般传调度器Dispatchers,代码块会运行在指定的线程池
public suspend fun <T> withContext(  
     context: CoroutineContext,  
     block: suspend CoroutineScope.() -> T  
 ): T {  
     contract {  
         callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
     }  
     // 返回启动withContext的协程体  
     return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->  
         // 构建一个新的newContext,合并当前协程体以及withContext协程体的CoroutineContext  
         val oldContext = uCont.context  
         val newContext = oldContext + context  
         // 检查协程是否活跃,如果线程处于非活跃的状态,抛出cancle异常  
         newContext.checkCompletion()  
         ...  
         // DispatchedCoroutine也是一个AbstractCoroutine对象,负责协程完成的回调,  
         // 注意这里的Continuation的传参为uCont,及发起withContext的协程对象  
         val coroutine = DispatchedCoroutine(newContext, uCont)  
         coroutine.initParentJob()  
                  // 和协程启动的流程一样,启动withContext的协程  
         // 注意这里的传参coroutine为DispatchedCoroutine,它持有需要恢复的协程 ,这时候已经开启了block代码的协程了,可能是异步的
         block.startCoroutineCancellable(coroutine, coroutine)  
         // 但是此时会返回一个结果,就是告诉父类协程这里要挂起了,你被挂起了,按照上面的分析,这里应该是返回了 COROUTINE_SUSPENDED
         coroutine.getResult()  
     }  
 }  

看下来就是执行了suspendCoroutineUninterceptedOrReturn函数,这个函数看不到源码,是属于kotlin编译器的内建函数,看不到也不用太纠结,只要记住那个uCont参数是withContext外面的协程续体即可,它用来恢复外面的协程。
DispatchedCoroutine:

// 定义的三种状态  待定、挂起、恢复
private const val UNDECIDED = 0 
private const val SUSPENDED = 1
private const val RESUMED = 2
// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // this is copy-and-paste of a decision state machine inside AbstractionContinuation
    // todo: we may some-how abstract it via inline class
    private val _decision = atomic(UNDECIDED)
 // 第一次调用trySuspend的时候肯定返回了true,看他的命名就很有意思
    private fun trySuspend(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }

    private fun tryResume(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }

    override fun afterCompletion(state: Any?) {
        // Call afterResume from afterCompletion and not vice-versa, because stack-size is more
        // important for afterResume implementation
        afterResume(state)
    }

    override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 第一次调用getResult的时候肯定返回了COROUTINE_SUSPENDED
    fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    }
}

至此,挂起的逻辑算是走通了,现在来捋一下:
1.父协程体执行的时候遇到了挂起函数withContext,withContext里开启自身的协程执行自身的协程体任务;
2.同时withContext返回COROUTINE_SUSPENDED
3.父协程体resumeWith方法里判断到了返回值是COROUTINE_SUSPENDED,于是结束了resumeWith的执行,父协程体里面withContext后面的代码暂停执行,但其实没有阻塞线程
4.父协程的协程体SuspendLambda保存了协程状态,记录了当前挂起的地方,此时withContext的协程体在执行,父协程处于挂起状态,等待被通知恢复

协程的恢复

父协程被挂起后,需要怎么恢复呢,首先它肯定不是自己去恢复,它是等待挂起点去通知它恢复,所以我们要到withContext的协程体里去找答案,因为只有它才知道自己什么时候执行完,然后通知父协程恢复执行。
所以还是要找到BaseContinuationImpl的resumeWith()方法,那里在执行withContext里的协程体block:

BaseContinuationImpl:
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) { 
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 执行协程体,注意此时执行的是挂起函数的block,正常会返回我们写的"withContext success"字符串
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 注意这个completion是我们创建的DispatchedCoroutine
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    // 执行完后走这里,执行DispatchedCoroutine的resumeWith,这里其实就是协程挂起点的恢复标志了
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

DispatchedCoroutine:
    //  DispatchedCoroutine父类AbstractCoroutine的方法
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
// DispatchedCoroutine自身实现的方法
override fun afterResume(state: Any?) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // Resume in a cancellable way because we have to switch back to the original dispatcher
        // 注意这个uCont是父协程的续体,他是个SuspendLambda,最终它又开启了拦截器拦截方法(所以说协程挂起
        // 恢复后执行的线程未必是挂起之前运行的线程,因为它又拦截了一次,也就又让线程池分配了一次),并在之前保存的状态处恢复了父协程的协程体代码执行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }

以上便是协程挂起后恢复的逻辑,关键还是对续体Continuation的巧妙调用和状态机模式的灵活设计,理解起来倒是不难。
另外提一个知识点,挂起函数一般有个关键字suspend修饰,但并不代表有suspend修饰的方法都会被挂起,只有像delay和withContext设计返回了那个挂起标志COROUTINE_SUSPENDED才会挂起,事实上,关键字suspend修饰的方法只是会让kotlin编译器在编译的时候新增一个父协程的续体入参,如果你没有手动调用这个续体的恢复方法,那么也不会执行恢复操作会一直挂起。
下篇会对协程的线程池展开分析。

上一篇下一篇

猜你喜欢

热点阅读