【译文】扒一扒Kotlin协程的幕后实现

2023-02-15  本文已影响0人  代码我写的怎么

前言

如果你能看完本文并把所有内容都弄懂,你对协程的理解也已经超过大部分人了。

Coroutines是近几年在Kotlin上Google主推的异步问题解决方案,至少在Android R Asynctask被放弃后,打开Android Document看到最显目的提示项目就是导引你至Coroutine的页面教导你怎么使用Coroutine。

Emm….那如果把所有问题简单化,其实大多数碰上异步问题,解决的办法基本上都是callback。

fun longTimeProcess(parameter1 : String, callBack:CallBack<String>){
    val result = ""
    //Do something long time
    callBack.onResponse(result)
}

其实我们一般执行回调,本质上都是callback,所以很多异步解决方案,追根溯源,会发现他的实现方式仍旧是callback。

不过callback的使用情境、context还有许许多多的用法情况都不同,整体概念也会有出入,所以我们会需要更多的名词来代表这样的情况,因此延伸出更多样的词汇,不过这段就题外话了。

话说回来,上面那段简易的callback,换作是Coroutine会变成是这样:

suspend fun longTimeProcess(parameter1:String):String{

val result =“”

//Do something long time

return result

}

这样写的好处是可以不用自己控制Thread的使用,上面的代码如果直接在主线程执行,可能会造成主线程卡顿,超过5秒喷Exception直接让Process out,所以还会需要额外自己开thread + handler或是使用Rxjava之类第三方套件去处理。换作是Coroutine,使用起来就简单很多了,被suspend修饰的函数longTimeProcess,有自己的作用域(Scope),用scope launch里头执行该function,利用这个function回传的数据做该在main thread上解决的事情,问题解决,就是如此的简单。

那问题来了。

Coroutine到底是怎么运作的?究竟是甚么神奇的魔法让他可以这么的方便可以不用写那么多东西呢?

记得某次面试里有提到这个问题,但我只知道他是个有限状态机,然后就…

恩,我那时的表情应该跟King Crimson有那么几分神似就是了。

Coroutine概念

维基百科上其实有解释了Coroutine的实作概念:

var q := new queue
coroutine produce
    loop
        while q is not full
            create some new items
            add the items to q
        yield to consume

coroutine consume
    loop
        while q is not empty
            remove some items from q
            use the items
        yield to produce

概念是,这有个queue是空的,那是先跑coroutine product还是coroutine consume其实无所谓,总之随便跑一个,先从coroutine product开始好了。

coroutine produce在queue没满时,会产生一些items,然后加入queue里头,直到queue满为止,接着把程序让给coroutine consume。

coroutine consume在queue不是空的时候,会移除(消费)一些items,直到queue空为止,接着把程序让给coroutine produce,如此反复,这个世界的经济得以维持。

那这边可以看出,当coroutine produce碰到queue是满的时候会直接把程序让给coroutine consume;相对的,若coroutine consume在碰到queue是空的时候,会直接把程序让给coroutine produce

那么,以Kotlin Coroutine来说,queue的是空是满的条件会变成是method的状态是否suspend,那因为上面这个程序很明显会是无限循环,多数我们在开发时会不需要无限的循环,那怎么样才能让这种来回传接球的形式有个终点呢?

答案就是有限状态机,接下来这篇文章会慢慢地解释。

有这么个东西叫做 Continuation

很多时候,原本很麻烦的事情突然变得简单了,其实不是什么都不用做,而是事情有人帮你做了,Coroutine也是,它帮你把写一堆callback的麻烦事给做掉了。

等等,Compiler把写一堆的callback的麻烦事给做掉了,那意思是…

没错,Coroutine本质上还是callback,只是编译器帮你写了。

我本来是想说从CoroutineScope.Launch下去追的,追到IntrinsicsJvm,这东西叫Intrinsic这东西有很大的机率是给编译器用的,追到这里,大概就可以知道,suspend fun会在编译的过程转成Continuation.

但后来换个方向去想,其实也不用这么麻烦,因为Kotlin是可以给Java呼叫的,那Java比较少这种语法糖转译的东西,也就是说,透过Java呼叫suspend fun,就可以知道suspend fun真正的模样。

这边先随便写一个suspend fun。

suspend fun getUserDescription(name:String,id:Int):String{
    return ""
}

在 Java 中调用的時候是如下这样:

instance.getUserDescription("name", 0, new Continuation<String>() {
    @NotNull
    @Override
    public CoroutineContext getContext() {
        return null;
    }

    @Override
    public void resumeWith(@NotNull Object o) {

    }
});
return 0;

我们可以看到,其实suspend fun就是一般的function后头加上一个Continuation

总之得到一个线索,这个线索就是Continuation,它是个什么玩意呢?

它是一个 interface

public interface Continuation<in T> {
    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

它代表的是CoroutinerunBlocksuspend状态中,要被唤醒的callback

那注意这边提到状态了,大伙都知道Coroutine会是个状态机,那具体是怎么个状态呢?这个稍后提。

那如果硬要在java file里头使用GlobalScope.launch,那会长成这样:

BuildersKt.launch(GlobalScope.INSTANCE,
        Dispatchers.getMain(),//context to be ran on
        CoroutineStart.DEFAULT,
        new Function2<CoroutineScope,Continuation<? super Unit>,String>() {
            @Override
            public String invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {

                return "";
            }
        }
);

这样就行了吗?这样好像没啥效果最后会回一个空字串就是了,但这里就会发现,如果用lanuch会需要用到一个Function去传递一个continuation。这样看还是蒙,没关系,咱们继续看下去。

Continuation到底怎么运行?

那这边简单用一个suspend:

fun main() {
    GlobalScope.launch {
        val text = suspendFunction("text")
        println(text) // print after delay
    }
}
suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){

    val result = doSomethingLongTimeProcess(text)
    result
}

Kotlin Bytecodedecompile 會得到這個:

public static final void main() {
   BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
      private CoroutineScope p$;
      Object L$0;
      int label;

      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         Object var10000;
         CoroutineScope $this$launch;
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            $this$launch = this.p$;
            this.L$0 = $this$launch;
            this.label = 1;
            var10000 = CoroutineTestKt.suspendFunction("text", this);
            if (var10000 == var5) {
               return var5;
            }
            break;
         case 1:
            $this$launch = (CoroutineScope)this.L$0;
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }

         String text = (String)var10000;
         boolean var4 = false;
         System.out.println(text);
         return Unit.INSTANCE;
      }

      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         var3.p$ = (CoroutineScope)value;
         return var3;
      }

      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), 3, (Object)null);
} 

另外一个是 suspendFunctiondecompile code

public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
   return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
      private CoroutineScope p$;
      int label;

      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            CoroutineScope $this$withContext = this.p$;
            String result = CoroutineTestKt.doSomethingLongTimeProcess(text);
            return result;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
      }

      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         var3.p$ = (CoroutineScope)value;
         return var3;
      }

      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), $completion);
}

字节码反编译成 Java 这种事,我们干过很多次了。跟往常不同的是,这次我不会直接贴反编译后的代码,因为如果我直接贴出反编译后的 Java 代码,估计会吓退一大波人。协程反编译后的代码,逻辑实在是太绕了,可读性实在太差了。没关系,我们直接梳理解释一下流程。

反编译代码中我们看到一个 switch(this.label) , 这就是大名鼎鼎的 Coroutine状态机了,Kotlin编译器会在编译时产生一个label,这个label就是runBlock里边执行到第几段的状态了。

那具体会有几个状态呢?其实在runBlock里边有几个suspend就会对应有几个状态机,举个例子:

GlobalScope.launch {
        test()
        test()
        test()
        test()
}
fun test(){}

如上代码会有几个呢?

答案是一个,因為這 test() 不是挂起函数(suspend function),它不需要挂起操作(suspended)。

如果换成是这样?

GlobalScope.launch {
        test()
        test()
        test()
        test()
}
suspend fun test(){}

答案是五个。

GlobalScope.launch {
        // case 0
        test() // case 1 receive result
        test() // case 2 receive result
        test() // case 3 receive result
        test() // case 4 receive result
}

因為四个 test() 都有可能获得 suspended 的状态,所以需要五个执行状态的,case 0 用于初始化,case 1– 4 用于结果获取。

那状态何时会改变呢?

答案是:invokeSuspend 执行时。

label34: {
   label33: {
      var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure($result);
         $this$launch = this.p$;
         this.L$0 = $this$launch;
         this.label = 1;
         if (CoroutineTestKt.test(this) == var3) {
            return var3;
         }
         break;
      case 1://...ignore
         break;
      case 2://...ignore
         break label33;
      case 3://...ignore
         break label34;
      case 4://...ignore
         return Unit.INSTANCE;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      this.L$0 = $this$launch;
      this.label = 2;
      if (CoroutineTestKt.test(this) == var3) {
         return var3;
      }
   }

   this.L$0 = $this$launch;
   this.label = 3;
   if (CoroutineTestKt.test(this) == var3) {
      return var3;
   }
}

this.L$0 = $this$launch;
this.label = 4;
if (CoroutineTestKt.test(this) == var3) {
   return var3;
} else {
   return Unit.INSTANCE;
}

这部分比较有意思的地方是,这些状态还有 call method 的都不在 switch case 里面,这其实跟 Bytecode 有关,主要是因为这个结果是 反编译 出來的东西,所以会是这样的叠加方式。

我们可以看到,在状态机改变时:

Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//...ignore

    this.label = 1;
    if (CoroutineTestKt.test(this) == var3) {
       return var3;
    }

根据上述代码可以看出, 编译器内部有一个函数IntrinsicsKt.getCOROUTINE_SUSPENDED() 该函数代表当前的状态是否挂起。如果它回传的是 getCOROUTINE_SUSPENDED,代表这个 function 处在 挂起(suspended)的状态,意味着它可能当前正在进行耗时操作。这时候直接返回 挂起状态,等待下一次被 调用(invoke)

那什么时候会再一次被 调用(invoke) 呢?

这时候就要看传入到该挂起函数的的 Continuation ,這裡可以觀察一下 BaseContinuationImplresumeWith 的操作:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
 public final override fun resumeWith(result: Result<Any?>) {
    var current = this
    var param = result
    while (true) {
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!!
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted()
            if (completion is BaseContinuationImpl) {
                current = completion
                param = outcome
            } else {
                completion.resumeWith(outcome)
                return
            }
        }
    }
 }
//...ignore
}

原则上 resumeWith 在一开始 Coroutine 被创建时就会执行(所以需要 case 0 做初始化),可以看到 invokeSuspend会被执行到。(probeCoroutineResumed 那個看起來是 debug 用的請無視),通过执行 invokeSuspend 开始执行状态机,如果该 continuation 的状态是挂起,就会执行return,重新执行 invokeSuspend,等下一次被唤醒,再次被唤醒后,继续执行,直到得到结果,并且将结果通过 continuation((name in completion)resumeWith返回结果,结束此次执行,接着继续执行挂起函数的的 invokeSuspend ,如此反复直至最终结束。

到這裡,我們知道了, suspend标记的函数内部是通过状态机才实现的挂起恢复的,并且利用状态机来记录Coroutine执行的状态

执行挂起函数时可以得到它的状态为:getCOROUTINE_SUSPENDED

不过又有问题来了,当挂起函数判断条件为:getCOROUTINE_SUSPENDED时执行了 return,代表它已经结束了,那它怎么能继续执行呢?而且还有办法在执行完后通知协程。

这里我们拿一段代码来看看:

suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){

    val result = doSomethingLongTimeProcess(text) 
    result //result 是個 String
}

它 decompile 後:

public static final Object suspendFunction(@NotNull final String text, @NotNull Continuation $completion) {
   return BuildersKt.withContext(
(CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
//...ignore
   }), $completion);
}

会发现,该函数 return 的不是 String而是一个Object,那这个Object是什么呢?其实就是COROUTINE_SUSPENDED

要证明这点其实很简单,如下代码,调用该 suspendFunction 就可以了

Object text = instance.suspendFunction("", new Continuation<String>() {
    @NotNull
    @Override
    public CoroutineContext getContext() {
        return Dispatchers.getMain();
    }

    @Override
    public void resumeWith(@NotNull Object o) {

    }
});
System.out.println(text);

結果:

COROUTINE_SUSPENDED

Process finished with exit code 0

PS:如果该函数时一个普通函数,没有标记suspend 则会直接返回结果。

根据上边我们这么多的分析,我们可以解释那段代码了。

fun main() {
    GlobalScope.launch {
        val text = suspendFunction("text")
        println(text) // print after delay
    }

}

suspend fun suspendFunction(text:String) = withContext(Dispatchers.IO){

    val result = doSomethingLongTimeProcess(text)
    result
}

首先,Kotlin编译器会把 main() 里面的代码反编译生成一个Continuation,而 launch block 的部分生成一個有限的状态机,并包装进 Continuation 里面那个叫 invokeSuspend(result) 的方法里头,并做为初次 resumeWith

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

invokeSuspend(result) 会在该 ContinuationresumeWith 执行的时候执行。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

第一次执行 invokeSuspend(result) 的时候,会执行到 suspendFunction(String),并传入包裝好的 Continuation

Continuation { // suspendFunction(text)
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val text = doSomethingLongTimeProcess(context)
                return 後執行 continuation.resultWith(text)

            }
        }
    }
}

suspendFunction 自己本身也是一個挂起函数,所以它也会包裝成一个 Continuation (但这边就单纯很多,虽然也会生成状态机,但其实就是直接跑doSomethingLongTimeProcess())。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

因为会进行耗时操作,所以直接回传COROUTINE_SUSPENDED,让原先执行该挂起函数的Threadreturn 并执行其他东西,而 suspendFunction则在另一条 Thread上把耗时任务完成。

Continuation { // GlobalScope.Lanuch()
    var label = 0
    fun invokeSuspend(result:Any):Any{
        when(label){
            0->{
                val functionResult = suspendFunction("text",this)
                lable = 1
                if(functionResult == COROUTINE_SUSPENDED){
                    return functionResult
                }
            }
            1->{
                throwOnFailure(result)
                break
            }
        }
        val text = result as String
        print(text)
    }
}

等待 suspendFunction 的耗时任务完成后,利用传入的 ContinuationresumeWith 把结果传入,这个动作会执行到挂起函数的invokeSuspend(result),并传入结果,该动作就能让挂起函数得到suspendFunction(String)的结果。

PS:上面那段代码实际上是伪代码,实际业务会比这复杂的多

所以事实上,挂起函数就是我把我的 callback 給你,等你结束后再用我之前给你的 callback 回调给我,你把你的 callback 給我,等我结束后我用之前你给我的 callback 通知你。

挂起函数时如何自行切换线程的?

原则上,挂起函数在执行时,就会决定好要用哪个 Dispatcher,然后就会建立挂起点,一般情况下,会走到 startCoroutineCancellable,然后执行createCoroutineUnintercepted,也就是上面提到的:resumeWithinvokeSuspend

我们进入到startCoroutineCancellable内部再看看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

createCoroutineUnintercepted 最后会产出一个 Continuation ,而resumeCancellableWith 其实就是我们前面说到的初始化操作, 這行会去执行状态机 case 0

至于 intercepted() ,到底要拦截啥,其实就是把生成的 Continuation 拦截给指定的 ContinuationInterceptor (这东西包裝在 CoroutineContext 里面,原则上在指定 Dispatcher 的时候就已经建立好了)

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

这里可以注意到 interceptContinuation(Continuation) ,可以用他追下去,发现他是 ContinuationInterceptor 的方法 ,再追下去可以发现CoroutineDispatcher 继承了他:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

可以发现该动作产生了一个 DispatchedContinuation,看看 DispatchedContinuation ,可以注意到刚才有提到的 resumeCancellableWith

inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

原则上就是利用 dispatcher 来決定需不需要 dispatch,沒有就直接执行了 resumeUndispatchedWith

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
    withCoroutineContext(context, countOrElement) {
        continuation.resumeWith(result)
    }
}

其实就是直接跑 continuationresumeWith

那回头看一下,其实就可以发现是 CoroutineDispatcher 决定要用什么 Thread 了。

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    @InternalCoroutinesApi
    public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    }
}

其实知道这个东西后,就可以向下去找它的 Child ,就能找到 HandlerDispatcher 了。

isDispatchNeeded 就是说是否需要切换线程

dispatch 则是切换线程的操作

可以看到这两个方法在 HandlerDispatcher 的执行:

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

可以看到CoroutineContext根本没有用到。

为什么呢?其实原因主要是: 挂起函数是设计给 Kotlin 用的,并不是专门设计给 Android用的,所以 Android 要用的话,还是需要实现 CoroutineDispatcher 的部分,这实际上是两个体系的东西。那 CoroutineDispatcherdispatch 有提供 CoroutineContext,但不见的 Android 这边会用到,所以就有这个情況了。

其他诸如 Dispatcher.Default ,他用到了 线程池(Executor)Dispatcher.IO 则是用到了一个叫 工作队列(WorkQueue) 的东西。

所以每一个 Dispatcher 都有自己的一套实现,目前有提供四种 Dispatcher

作者:kevinEegets
链接:https://juejin.cn/post/7064859645440950302

上一篇下一篇

猜你喜欢

热点阅读