Kotlin 协程

2021-04-17  本文已影响0人  星流星

深入剖析 Kotlin 协程

什么是协程

协程是什么?可以看看知乎上的大佬们怎么说。

https://www.zhihu.com/question/342261454/answer/800062080

总结起来:协程,英文名 Coroutine,是一种比线程更加轻量级的存在,正如一个线程可以拥有多个线程,一个线程也可以拥有多个协程(这句话不是那么的正确)。协程是不被操作系统内核管理的,是由程序控制的(也就是在用户态执行)。

协程看上去也是子程序, 但执行过程中, 在子程序内部可中断, 然后转而执行别的子程序, 在适当的时候再返回来接着执行。
协程就是一段程序或者函数能够被挂起,让出运行权,然后等会再恢复继续执行。协程通过让出运行权来实现写作。所以最核心的概念就是挂起和恢复。

Kotlin 中的协程

Kotlin 中为我们提供了如下两个创建协程的 API:


@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

标准库提供了 createCoroutine 的函数来创建协程,下面来看一个简单的例子:

fun main() {
    val continuation = suspend {
        println("协程执行中...")
        "协程的返回值"
    }.createCoroutine(object : Continuation<String> {
        override fun resumeWith(result: Result<String>) {
            println("协程执行结束: $result")
        }

        override val context: CoroutineContext = EmptyCoroutineContext

    })

    continuation.resume(Unit)
}

看到上面的代码可能说这是啥?啥玩意儿?

我们一点一点来看,首先来看看 suspend 函数,它的定义如下:

@kotlin.internal.InlineOnly
@SinceKotlin("1.2")
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@RequireKotlin("1.2.30", level = DeprecationLevel.HIDDEN, versionKind = RequireKotlinVersionKind.COMPILER_VERSION)
public inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block

看到这里脑瓜子是不是又嗡嗡的。这里的 R 是什么,呀!按照上面的推断 R 应该是 String。suspend 函数的返回值应该是 suspend () -> R 。合着我传递一个 suspend () -> R,然后给我调用一下返回,闹着玩呢?但是这里值得注意的是 suspend 是一个函数名,{} 内部的才是 suspend () -> R。就是通过这样的写法来说明 {} 中的部分是 suspend() -> R 类型。suspend 是一个关键字,所以这里有编译器做的时,这样写是给编译器看,说我这是一个 suspend () -> T 类型的函数。(牵强解释一波)

被 suspend 修饰的部分我们可能称之为协程体。createCoroutine 函数中有一个参数 completion,类型为 Continuation。协程执行完了会调用 completion 的 resumeWith 来告诉我们执行结束了。我们来看看这个 Continuation 到底是个啥?


/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

很简单,一个变量 context 存储了一个 CoroutineContext 类型的变量。可以看出这个是协程的上下文,类似于 Android 中的 context。保存了协程的上下文。这个等会再说。还有一个 resumeWith 方法来告诉执行结束了。

可以看到 createCoroutine 函数也返回了一个 Continuation。然后再看看最后一行代码 continuation.resume()。然后点进 resume 方法中一看。

@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

这咋有调用到了 resumeWith 了。那么不写最后一行会怎么呢?你会发现,啥输入都没有。所以说创建好的协程并不会立即开始。调用 resumeWith 表示我这边已经准备就绪了,你可以开始了。

或许你已经猜到了 Continuation 才是协程的核心。没错,协程执行完成然会通过 resumeWith 告诉已经执行完成,resumeWith 中再继续执行其他的部分。

但是我们看起来好像还是云里雾里的。让我们简单粗暴起来。既然 Kotlin 我不懂,那就反编译成 Java,让我瞅瞅。说干就干。

public final class CreateCoroutineKt {
  public static final void main() {
    CreateCoroutineKt$main$continuation$1 createCoroutineKt$main$continuation$1 = new CreateCoroutineKt$main$continuation$1(null);
    boolean bool1 = false;
    Continuation continuation = 
      
      ContinuationKt.createCoroutine(createCoroutineKt$main$continuation$1, new CreateCoroutineKt$main$continuation$2());
    Continuation continuation1 = continuation;
    Unit unit = Unit.INSTANCE;
    boolean bool2 = false;
    Result.Companion companion = Result.Companion;
    boolean bool3 = false;
    continuation1.resumeWith(Result.constructor-impl(unit));
  }
  
  static final class CreateCoroutineKt$main$continuation$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;
    
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      String str;
      boolean bool;
      Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (this.label) {
        case 0:
          ResultKt.throwOnFailure($result);
          str = ";
          bool = false;
          System.out.println(str);
          return ";
      } 
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
    CreateCoroutineKt$main$continuation$1(Continuation param1Continuation) {
      super(1, param1Continuation);
    }
    
    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      return (Continuation<Unit>)new CreateCoroutineKt$main$continuation$1(completion);
    }
    
    public final Object invoke(Object param1Object) {
      return ((CreateCoroutineKt$main$continuation$1)create((Continuation)param1Object)).invokeSuspend(Unit.INSTANCE);
    }
  }
  
  public static final class CreateCoroutineKt$main$continuation$2 implements Continuation<String> {
    public void resumeWith(@NotNull Object result) {
      String str = "" + Result.toString-impl(result);
      boolean bool = false;
      System.out.println(str);
    }
    
    @NotNull
    private final CoroutineContext context = (CoroutineContext)EmptyCoroutineContext.INSTANCE;
    
    @NotNull
    public CoroutineContext getContext() {
      return this.context;
    }
  }
}

PS:这里用的反编译工具是 JD-GUI。IDEA 的反编译感觉有点问题。

反编译后的代码有点长。而且都是一堆奇奇怪怪的东西。我们逐步来看。首先来看协程的主体部分:CreateCoroutineKt$main$continuation$1,这个是一个编译生成的内部类,对于这些 Lambda 表达式,Kotlin 都会帮我们生成一个内部类。如果没有参数就会实现 Function0,一个参数就会实现 Function1,以此类推,直到 Function22。我们来看看这里的 Function1 的源码。

public interface Function1<in P1, out R> : Function<R> {
    /** Invokes the function with the specified argument. */
    public operator fun invoke(p1: P1): R
}

如果要调用 Function1 的实例最终要调用 invoke 方法。

我们来看看这个类的继承关系:

create_coroutine.png

这次让我们从源头开始,我们来看看 createCoroutine:

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit>

看到了 expect 关键子,就不得不提一下 kotlin 的 Multiplatform,这里只是表示“预计”会有一个,具体的还得看各大平台的实现,这里我们通过 IDEA 搜索找找实现,它的 JVM 平台的实现如下:

@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

继续看看 probeCoroutineCreate,发现这个玩意是 debugger 下的一些内容:

@SinceKotlin("1.3")
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
    /** implementation of this function is replaced by debugger */
    return completion
}

先跳过这个。

BaseContinuation 是什么,让我们回头看看我们反编译的结果中的 SuspendLambda:

@SinceKotlin("1.3")
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}

@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

看到了没 BaseContinuationImpl,这里就不继续展开了源码了,BaseContinuationImpl 是一个抽象类,它的 create 方法如下:

public open fun create(completion: Continuation<*>): Continuation<Unit> {
       throw UnsupportedOperationException("create(Continuation) has not been overridden")
}

来看看我们反编译后的实现:

@NotNull
public final Continuation<Unit> create(@NotNull Continuation completion) {
  Intrinsics.checkNotNullParameter(completion, "completion");
  return (Continuation<Unit>)new CreateCoroutineKt$main$continuation$1(completion);
}

这里创建了一个 CreateCoroutineKt$main$continuation$1 实例。

createCoroutineUnitercepted 函数返回了 CreateCoroutineKt$main$continuation$1,该实例的 Continuation 是通过 probeCoroutineCreated(completion) 函数创建的,我们现在先不关注这个。

接下来让我们回到 createCoroutine,发现这里有一个 intercepted。这就不得不提起 Kotlin 协程中的一大堆内容了。

在讲这些之前我们来看看另一个被我们遗漏的东西。前面我们展示了两个创建协程的 API,那么另一个究竟是什么呢?

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

区别就在于协程体前面多了一个 R。R.() -> T 它可以理解为 R 的一个匿名的扩展函数。协程体可以很轻松的调用 R 里面的方法,这样就可以把轻松的实现协程的作用域了。

下面来写一个简单的小例子。

fun createCoroutineWithScope() {

    class SimpleCoroutineScope {
        val say = "我是一个简单的作用域"
    }

    fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
        block.createCoroutine(receiver, object : Continuation<T> {
            override val context: CoroutineContext
                get() = EmptyCoroutineContext

            override fun resumeWith(result: Result<T>) {
                println("协程执行结束了. $result")
            }

        }).resume(Unit)
    }

    launchCoroutine(SimpleCoroutineScope()) {
        println("协程开始执行了...")
        println(say)
        "协程的返回值"
    }
}

还有一组直接启动协程的 API:

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

/**
 * Starts a coroutine with receiver type [R] and result type [T].
 * This function creates and starts a new, fresh instance of suspendable computation every time it is invoked.
 * The [completion] continuation is invoked when the coroutine completes with a result or an exception.
 */
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}

可以看到和直接创建 Coroutine 相比,只是少了 SafeContinuation 的包装。当然 SafeContinuation 也不是简单的包装,里面也有自己的逻辑,我们后面再说这个。

协程的上下文

Context 上下文,这个不难理解,像 Android 中也有上下文 Context,Spring 中的 applicationContext。协程也有自己的上下文。
当然协程的上下文中需要保存很多的信息,那么如何保存这些信息呢?我们可以看看 CoroutineContext 的接口:

@SinceKotlin("1.3")
public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key].
     */
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

    /**
     * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
     */
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

这个 CoroutineContext 中的东西真不少。我们先不着急分析。我们先来看看我们之前常用的 EmptyCoroutineContext,这个是 Kotlin 帮助我们实现好的,顾名思义就是一个空的 CoroutineContext。

接下来关键到了,协程上下文可以像数字一样加减,神奇不。协程上下文能够加减在于其重载了 +, -, [] 操作符。
CoroutineContext 它是一个协程上下文的集合,集中的类型就是 Element。可以看到 Element 本身也实现 CoroutineContext 的接口,作为是为了 API 使用起来方便。Element 还有一个属性 key,这个 key 就相当于 Element 在协程上下文中的唯一标志,可以用来索引 Element

CoroutineContext 有一个实现 CombinedContext。它的构造函数如下:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable

left 是一个 CoroutineContext,element 的类型是 Element 它也是 CoroutineContext 的一个实现类。所以最终大致是:

(CombinedContext(CombinedContext(CombinedContext(...)), element), element)

感兴趣的可以自己研究一下。

下面我们来简单的使用一下协程的上下文。

首先我们来看看一个抽象类,它能够让我们更加方便的实现协程上下文的元素。

@SinceKotlin("1.3")
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

这个抽象类很简单,只是把 key 变成了构造器的参数。

下面来创建一个简单的协程上下文:

class CoroutineName(val name: String): AbstractCoroutineContextElement(Key) {
    companion object Key: CoroutineContext.Key<CoroutineName>

    override fun toString() = name
}

fun main() {
    var coroutineContext: CoroutineContext = EmptyCoroutineContext
    coroutineContext += CoroutineName("简单的协程名")
    suspend {
        println("协程执行中... 协程名为: [${coroutineContext[CoroutineName]}].")
        "协程执行结束"
    }.startCoroutine(object : Continuation<String> {
        override val context: CoroutineContext = coroutineContext

        override fun resumeWith(result: Result<String>) {
            println("协程执行结束: $result")
        }
    })
}

协程上下文中也可以进行异常处理,我们来简单演示一下:

class CoroutineExceptionHandler(private val onErrorAction: (Throwable) -> Unit)
    : AbstractCoroutineContextElement(Key) {
    companion object Key: CoroutineContext.Key<CoroutineExceptionHandler>

    fun onError(error: Throwable) {
        error.printStackTrace()
        onErrorAction(error)
    }
}

var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("简单的协程名")
coroutineContext += CoroutineExceptionHandler {
    println("呀!出错了,${it.message}")
}

override fun resumeWith(result: Result<String>) {
    result.onFailure {
        context[CoroutineExceptionHandler]?.onError(it)
    }.onSuccess { 
        println("协程执行成功了!")
    }
}

可以再协程中构造一个简单的异常,比如除零异常来尝试一下效果。

协程的挂起

前面谈到协程概念的时候我们说到了协程的挂起和恢复,那么究竟什么是挂起,怎么挂起,又怎样恢复。

再 Kotlin 中被 suspend 修饰的函数称为挂起函数,这个挂起函数只能再协程体或者其他挂起函数内使用。所以这里函数就分为了两种,普通函数和挂起函数。普通函数中不可以调用挂起函数,挂起函数中既可以调用挂起函数也可以调用普通的函数。

suspend fun suspendFunctionA() {
    println("我是一个挂起函数")
}

suspend fun suspendFunctionB()
        = suspendCoroutine<Int> { continuation ->
    thread {
        continuation.resumeWith(Result.success(5))
    }
}

上面的两个挂起函数有啥不一样?
第一个挂起函数就像一个普通函数一样,第二个挂起函数看着有点不太明白,suspendCoroutine 这个函数看起来好奇怪,首先我们来看看这个函数的源码:

@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

suspendCoroutine 函数的第一行是一个 Kotlin 的特性,它可以确保 block 只被执行一次。

我们再看这个函数有一个参数 block 中有一个参数 Continuation<T> 这个是什么,我们的 suspendFunctionB 中好像也没有这个参数呀?那这个哪里来的?我们来写一个简单的协程来调用一下这个挂起函数。

fun main() {
    suspend {
        suspendFunctionB()
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("协程执行结束: $result")
        }

    })
}

我们再反编译一下看看结果:

public final class SuspendFunctionKt {
  
  @Nullable
  public static final Object suspendFunctionB(@NotNull Continuation $completion) {
    boolean bool1 = false, bool2 = false;
    Continuation continuation1 = $completion;
    boolean bool3 = false;
    SafeContinuation safeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(continuation1));
    Continuation continuation = (Continuation)safeContinuation;
    int $i$a$-suspendCoroutine-SuspendFunctionKt$suspendFunctionB$2 = 0;
    ThreadsKt.thread$default(false, false, null, null, 0, new SuspendFunctionKt$suspendFunctionB$2$1(continuation), 31, null);
    if (safeContinuation.getOrThrow() == IntrinsicsKt.getCOROUTINE_SUSPENDED())
      DebugProbesKt.probeCoroutineSuspended($completion); 
    return safeContinuation.getOrThrow();
  }
 
  
  public static final void main() {
    SuspendFunctionKt$main$1 suspendFunctionKt$main$1 = new SuspendFunctionKt$main$1(null);
    boolean bool = false;
    ContinuationKt.startCoroutine(suspendFunctionKt$main$1, (Continuation)new Object());
  }
}
final class SuspendFunctionKt$suspendFunctionB$2$1 extends Lambda implements Function0 {
   // $FF: synthetic field
   final Continuation $continuation;

   // $FF: synthetic method
   // $FF: bridge method
   public Object invoke() {
      this.invoke();
      return Unit.INSTANCE;
   }

   public final void invoke() {
      String var1 = "挂起函数执行中...";
      boolean var2 = false;
      System.out.println(var1);
      Continuation var10000 = this.$continuation;
      Companion var4 = Result.Companion;
      Integer var5 = 5;
      boolean var3 = false;
      var10000.resumeWith(Result.constructor-impl(var5));
   }

   SuspendFunctionKt$suspendFunctionB$2$1(Continuation var1) {
      super(0);
      this.$continuation = var1;
   }
}


这里我们保留了我们感兴趣的部分。
看到这里不禁有疑问为什么我们的挂起函数 suspendFunctionB 会有一个 Continuation 参数,我们明明什么参数都没有呀!你一定猜到了这个 Continuation 是编译器生成的,那么这个参数是哪里来的呢?我们再来看看另外一段反编译后的代码。

static final class SuspendFunctionKt$main$1 extends SuspendLambda implements Function1<Continuation<? super Integer>, Object> {
    int label;
    
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (this.label) {
        case 0:
          ResultKt.throwOnFailure($result);
          this.label = 1;
          if (SuspendFunctionKt.suspendFunctionB((Continuation<? super Integer>)this) == object)
            return object; 
          return SuspendFunctionKt.suspendFunctionB((Continuation<? super Integer>)this);
        case 1:
          ResultKt.throwOnFailure($result);
          return $result;
      } 
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    
    SuspendFunctionKt$main$1(Continuation param1Continuation) {
      super(1, param1Continuation);
    }
    
    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      return (Continuation<Unit>)new SuspendFunctionKt$main$1(completion);
    }
    
    public final Object invoke(Object param1Object) {
      return ((SuspendFunctionKt$main$1)create((Continuation)param1Object)).invokeSuspend(Unit.INSTANCE);
    }

我们可以看到调用 suspendFunctionB 的地方,传入了 this,所以 suspendFunction 的参数就是编译器生成的匿名内部类的实例。为什么要这么做呢?我们接着看。

CPS 变换

CPS 变换(Continuation-Passing-Style Transformation),是通过传递 Continuation 来控制异步程序的跳转的。
程序被挂起的时候,怎样恢复,我们想想线程被中断的时候需要将中断点保存再调用栈中,那么协程肯定也需要保存这个挂起点。
Kotlin 协程将挂起点保存再了 Continuation 对象中。然后恢复的时候只需要执行它的恢复调用并且把对应的参数或者异常传入即可。

这里我们可以看到如果想要挂起那么我们可以通过 suspendCoroutine 来获取 suspend 函数的 Continuation 参数。

那么挂起函数一定会挂起吗?当然上面的例子中的 suspendFunctionA 肯定不会挂起,它不能算作一个挂起函数,它只是一个带有 Continuation 参数的普通的函数。我们来看看一个不会挂起的挂起函数的例子。

suspend fun notSuspendFunction() = suspendCoroutine<Int> { continuation ->
    continuation.resume(200)
}

我们再来反编译一下看看:

  @Nullable
  public static final Object notSuspendFunction(@NotNull Continuation $completion) {
    boolean bool1 = false, bool2 = false;
    Continuation continuation1 = $completion;
    boolean bool3 = false;
    SafeContinuation safeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(continuation1));
    Continuation continuation = (Continuation)safeContinuation;
    int $i$a$-suspendCoroutine-SuspendFunctionKt$notSuspendFunction$2 = 0;
    Continuation continuation2 = continuation;
    Integer integer = Boxing.boxInt(200);
    boolean bool4 = false;
    Result.Companion companion = Result.Companion;
    boolean bool5 = false;
    continuation2.resumeWith(Result.constructor-impl(integer));
    if (safeContinuation.getOrThrow() == IntrinsicsKt.getCOROUTINE_SUSPENDED())
      DebugProbesKt.probeCoroutineSuspended($completion); 
    return safeContinuation.getOrThrow();
  }

反编译的结果也没有啥不同的,为啥 notSuspendFunction 不会挂起呢?问题就再 SafeContinuation 中。我们来看看这个类的实现:

@PublishedApi
@SinceKotlin("1.3")
internal expect class SafeContinuation<in T> : Continuation<T> {
    internal constructor(delegate: Continuation<T>, initialResult: Any?)

    @PublishedApi
    internal constructor(delegate: Continuation<T>)

    @PublishedApi
    internal fun getOrThrow(): Any?

    override val context: CoroutineContext
    override fun resumeWith(result: Result<T>): Unit
}

这里肯定不是 SafeContinuation 的实现,我们来找找实现。

@PublishedApi
@SinceKotlin("1.3")
internal actual class SafeContinuation<in T>
internal actual constructor(
    private val delegate: Continuation<T>,
    initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
    @PublishedApi
    internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)

    public actual override val context: CoroutineContext
        get() = delegate.context

    @Volatile
    private var result: Any? = initialResult

    private companion object {
        @Suppress("UNCHECKED_CAST")
        private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
            SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
        )
    }

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }

    @PublishedApi
    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = delegate as? CoroutineStackFrame

    override fun getStackTraceElement(): StackTraceElement? =
        null

    override fun toString(): String =
        "SafeContinuation for $delegate"
}

首先我们来分析一下,我们调用了 SafeContinuation 只有一个参数的构造器,所以这里的 initialResult 就是 UNDECIDED,也就是说 result = UNDECIDED。然后调用 resumeWith,通过 CAS 将 result 更新为我们传入的值 Result.success(200),并且返回。
然后 notSuspendFunction 中调用 getOrThrow 将我们传入的 Result.success(200) 返回。
之后的话会调用 getOrThrow。这里的 result 是我们传入的 200,所以就直接返回了。

我们再来看看 SuspendFunctionB 反编译后的代码。与 notSuspendFunction 不同的是,最后的 resume() 方法的调用是在另外一个线程中,所以这里肯定会先调用 getOrThrow。调用 getOrThrow 后 result 的状态后被 CAS 更新为 COROUTINE_SUSPENDED。所以之后调用 resumeWith 的时候就会调用 delegateresumeWith
总结一下,是否会挂起主要是看 resumeWithgetOrThrow 那个先调用,如果是异步调用就会先调用 getOrThrow,否则就会先调用 resumeWith。如果是异步的话 suspend 方法返回的就是 COROUTINE_SUSPEND

这里其实我们一直都漏掉了一个 interceptor,下面我们来看看 interceptor

协程的拦截器

协程中我们还有一个比较关心的点就是协程的调度,那么协程是如何调度的?
在 Continuation 和协程上下文的基础上,Kotlin 又提供了一个叫做拦截器(Interceptor) 的组件,它允许我们拦截协程异步回调时的恢复调用。既然可以拦截恢复调用,那么想要操纵协程调用应该也没有问题。

在 Kotlin 协程中想要实现一个简单的协程的拦截器我们只需实现 ContinuationInterceptor 接口,然后把这个实现类的实例添加到协程上下文中即可。拦截器会在协程被挂起的时候调用。

fun main() {
    suspend {
        suspendFunctionB()
        suspendFunctionB()
        123
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
            get() = LogInterceptor()

        override fun resumeWith(result: Result<Int>) {
            result.getOrThrow()
        }
    })
}

class LogInterceptor : ContinuationInterceptor {
    override val key: CoroutineContext.Key<*>
        get() = ContinuationInterceptor

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        LogContinuation(continuation)

}

class LogContinuation<T>(private val continuation: Continuation<T>)
    : Continuation<T> by continuation {

    override fun resumeWith(result: Result<T>) {
        println("before resumeWith: $result")
        continuation.resumeWith(result)
        println("after resumeWith.")
    }
}

上面的程序在启动的时候被挂起了一起,调用 suspendFunctionB 挂起函数的时候调用了一次。这里值得注意的一点是挂起函数不一点会挂起,详见上面不会挂起的挂起函数。

我们可以在上面的源码中找到协程启动时候的挂起。

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

可以看到这里调用了一次 intercepted,来让我们看看这里的 interceptor 究竟做了什么?

@Transient
private var intercepted: Continuation<Any?>? = null

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

上面分析过 createCoroutineUnintercepted 函数最终返回的是一个匿名内部类,它的父类是 SuspendLambda,我们可以在这给类的父类 ContinuationImpl 中找到这个方法。在我们的程序中刚开始变量 intercepted 变量肯定是空的,所以接着从协程上下文中获取拦截器,如果获取到就会返回协程上下文中的拦截器。

然后紧急着调用拦截器的 resume。这就是协程刚开始第一次调用协程的拦截器。

之后我们又在反编译 suspendFunctionBnotSuspendFunction 的时候见到了拦截器的调用。

@Nullable
  public static final Object suspendFunctionB(@NotNull Continuation $completion) {
    boolean bool1 = false, bool2 = false;
    Continuation continuation1 = $completion;
    boolean bool3 = false;
    SafeContinuation safeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(continuation1));
    Continuation continuation = (Continuation)safeContinuation;
    int $i$a$-suspendCoroutine-SuspendFunctionKt$suspendFunctionB$2 = 0;
    ThreadsKt.thread$default(false, false, null, null, 0, new SuspendFunctionKt$suspendFunctionB$2$1(continuation), 31, null);
    if (safeContinuation.getOrThrow() == IntrinsicsKt.getCOROUTINE_SUSPENDED())
      DebugProbesKt.probeCoroutineSuspended($completion); 
    return safeContinuation.getOrThrow();
  }

接下来我们看看这里的 intercepted 的调用。

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

这里是 Continuation 的一个扩展方法,也是调用了 ContinuationImplintercepted 方法。最终会返回我们添加的拦截器实例,但是这里并没有直接调用,而是传入了 SafeContinuation 中,也就是说 SafeContinuation 中的 delegateresume 方法被调用就会调用到我们添加的拦截器。

让我们继续往下看这段程序,紧接着调用了 SafeContinuation 中的 getOrThrow 方法。

    @PublishedApi
    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }

前面分析过,如果 SafeContinuationresumeWith 方法被异步调用了,那么肯定会先调用 getOrThrow 方法。由于初始化的时候 result 为 UNDECIDED,所以这里会把 result 修改为 COROUTINE_SUSPEND,并返回 COROUTINE_SUSPEND,这个值的意思就是当前协程被挂起了。然后在异步调用 resumeWith 的时候就会将当前的 result 修改为 RESUME,然后调用 delegate.resumeWith(result),从而调用我们的拦截器。


上面我们只剖析了一个简单的协程的启动调用,Kotlin 官方使用这些 API 实现了一套更加复杂的协程。

写的有点乱,后续更新一下,敬请期待。

上一篇 下一篇

猜你喜欢

热点阅读