Kotlin编程

Kotlin Coroutine suspend 原理解析

2021-07-21  本文已影响0人  wo883721

一. 回调地狱

1.1 同步操作

假如我们有这样一个需求:

fun childFun1(): Int {
    return 10
}

fun childFun2(): Int {
    return 20
}

fun childFun3(num1: Int, num2: Int): Int {
    return num1 + num2
}

fun parentFun() : Int {
    val num1 = childFun1()
    val num2 = childFun2()
    val sum = childFun3(num1, num2)
    return sum
}

即: 从多个操作中(childFun1, childFun2) 获取值,然后再对这些值进行处理(childFun3),程序逻辑非常直观易懂。

例如,先上传用户选择的图片到服务端储存,获取服务端返回的图片对应的地址,然后这些地址设置到对应位置。

1.2 异步操作

如果这些操作都是耗时操作,为了不阻塞线程,需要将这些耗时操作放到其他线程中,即

val executor: ExecutorService = Executors.newFixedThreadPool(2)

fun childFun1(callback: (Int) -> Unit): Unit {
    executor.execute {
        Thread.sleep(1000)
        callback(10)
    }
}

fun childFun2(callback: (Int) -> Unit): Unit {
    executor.execute {
        Thread.sleep(2000)
        callback(20)
    }
}

fun childFun3(num1: Int, num2: Int, callback: (Int) -> Unit): Unit {
    executor.execute {
        Thread.sleep(500)
        callback(num1 + num2)
    }
}

fun parentFun(callback: (Int) -> Unit) : Unit {
    childFun1(fun(num1) {
        childFun2(fun(num2) {
            childFun3(num1, num2, callback)
        })
    })
}

因为是异步操作,结果值不能直接返回,只能通过 callback 方式异步回传,所以当异步操作很多的时候,整个回调链就很长了,让代码逻辑显得不清晰。

1.3 协程

suspend fun childFun1(): Int {
    delay(1000)
    return 10
}

suspend fun childFun2(): Int {
    delay(2000)
    return 20
}

suspend fun childFun3(num1: Int, num2: Int): Int {
    delay(500)
    return num1 + num2
}

suspend fun parentFun() : Int {
    val num1 = childFun1()
    val num2 = childFun2()
    val sum = childFun3(num1, num2)
    return sum
}

我们可以看到和同步方式操作一模一样,只不过方法上多了 suspend 关键字而已。

注: 这里的 delay 方法,不会像 Thread.sleep 阻塞当前线程。

二. suspend 原理

上面说过,suspend 不会阻塞当前线程,那么它怎么将异步操作的数据,同步传递回来呢?答案其实也是回调,只不过隐藏的很深,我们慢慢分析。

suspend fun childFun1(): Int {
    Thread.sleep(1000)
    return 10
}

suspend fun childFun2(): Int {
    Thread.sleep(2000)
    return 20
}

suspend fun childFun3(num1: Int, num2: Int): Int {
    Thread.sleep(500)
    return num1 + num2
}

suspend fun parentFun() : Int {
    val num1 = childFun1()
    val num2 = childFun2()
    val sum = childFun3(num1, num2)
    return sum
}

这里将 delay 换成 Thread.sleep ,先看 suspend 反编译的 java 代码

public final class CoroutineKt {
   @Nullable
   public static final Object childFun1(@NotNull Continuation $completion) {
      Thread.sleep(1000L);
      return Boxing.boxInt(10);
   }

   @Nullable
   public static final Object childFun2(@NotNull Continuation $completion) {
      Thread.sleep(2000L);
      return Boxing.boxInt(20);
   }

   @Nullable
   public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
      Thread.sleep(500L);
      return Boxing.boxInt(num1 + num2);
   }

   @Nullable
   public static final Object parentFun(@NotNull Continuation $completion) {
      Object $continuation;
      label37: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label37;
            }
         }

         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;
            int I$0;
            int I$1;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineKt.parentFun(this);
            }
         };
      }

      Object var10000;
      label31: {
         int num1;
         int num2;
         Object var6;
         label30: {
            Object $result = ((<undefinedtype>)$continuation).result;
            var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(((<undefinedtype>)$continuation).label) {
            case 0:
               ResultKt.throwOnFailure($result);
               ((<undefinedtype>)$continuation).label = 1;
               var10000 = childFun1((Continuation)$continuation);
               if (var10000 == var6) {
                  return var6;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            case 2:
               num1 = ((<undefinedtype>)$continuation).I$0;
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break label30;
            case 3:
               num2 = ((<undefinedtype>)$continuation).I$1;
               num1 = ((<undefinedtype>)$continuation).I$0;
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break label31;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            num1 = ((Number)var10000).intValue();
            ((<undefinedtype>)$continuation).I$0 = num1;
            ((<undefinedtype>)$continuation).label = 2;
            var10000 = childFun2((Continuation)$continuation);
            if (var10000 == var6) {
               return var6;
            }
         }

         num2 = ((Number)var10000).intValue();
         ((<undefinedtype>)$continuation).I$0 = num1;
         ((<undefinedtype>)$continuation).I$1 = num2;
         ((<undefinedtype>)$continuation).label = 3;
         var10000 = childFun3(num1, num2, (Continuation)$continuation);
         if (var10000 == var6) {
            return var6;
         }
      }

      int sum = ((Number)var10000).intValue();
      return Boxing.boxInt(sum);
   }
}

2.1 Continuation

我们注意到 suspend 修饰的方法,转成 java 方法时,会在方法最后面添加上 Continuation 类型的参数:

suspend fun childFun3(num1: Int, num2: Int): Int {
    Thread.sleep(500)
    return num1 + num2
}
//  变成了
@Nullable
public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
      Thread.sleep(500L);
      return Boxing.boxInt(num1 + num2);
}

这个 Continuation 实例非常重要,它是协程能够实现异步回调的关键对象。

/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
  1. context : 协程的上下文对象,里面储存着协程上下文环境信息,比如父协程,线程协调器等
  2. resumeWith 方法: 唤醒协程继续执行的方法

注: 这里的唤醒,并不是说协程被线程阻塞了

2.2 协程体

观察 childFun1childFun2childFun3 方法,除了参数上多了一个 $completion 参数,并没有其他变化,那是因为这三个方法中,没有调用其他 suspend 方法,所以和普通函数没有多大区别。

suspend fun parentFun() : Int {
    val num1 = childFun1()
    val num2 = childFun2()
    val sum = childFun3(num1, num2)
    return sum
}

// 转变成了

 @Nullable
   public static final Object parentFun(@NotNull Continuation $completion) {
      Object $continuation;
      label37: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label37;
            }
         }

         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;
            int I$0;
            int I$1;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineKt.parentFun(this);
            }
         };
      }

      Object var10000;
      label31: {
         int num1;
         int num2;
         Object var6;
         label30: {
            Object $result = ((<undefinedtype>)$continuation).result;
            var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(((<undefinedtype>)$continuation).label) {
            case 0:
               ResultKt.throwOnFailure($result);
               ((<undefinedtype>)$continuation).label = 1;
               var10000 = childFun1((Continuation)$continuation);
               if (var10000 == var6) {
                  return var6;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            case 2:
               num1 = ((<undefinedtype>)$continuation).I$0;
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break label30;
            case 3:
               num2 = ((<undefinedtype>)$continuation).I$1;
               num1 = ((<undefinedtype>)$continuation).I$0;
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break label31;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            num1 = ((Number)var10000).intValue();
            ((<undefinedtype>)$continuation).I$0 = num1;
            ((<undefinedtype>)$continuation).label = 2;
            var10000 = childFun2((Continuation)$continuation);
            if (var10000 == var6) {
               return var6;
            }
         }

         num2 = ((Number)var10000).intValue();
         ((<undefinedtype>)$continuation).I$0 = num1;
         ((<undefinedtype>)$continuation).I$1 = num2;
         ((<undefinedtype>)$continuation).label = 3;
         var10000 = childFun3(num1, num2, (Continuation)$continuation);
         if (var10000 == var6) {
            return var6;
         }
      }

      int sum = ((Number)var10000).intValue();
      return Boxing.boxInt(sum);
   }

我们发现 parentFun 方法转成的 java 代码,比我们想象中的要多,这个就是协程实现的秘密。

方法流程分析:

  1. 创建 $continuation 对象, 它是 ContinuationImpl 类的实例,并且储存了方法参数的 $completion 实例,作用是当本协程体(parentFun 方法) 执行完毕之后,会回调 $completion 实例的 resumeWith 方法,唤醒调用方的协程。

注: 当调用 $continuation 对象的 resumeWith 方法会调用 invokeSuspend 方法,就会再次调用 parentFun 方法。

  1. label 不同的时候,执行的逻辑不同:
    1 . 当 label = 0 时,先将 label 设置成 1 ,并调用 childFun1 方法,参数就是当前协程体 $continuation。如果返回的结果值是 COROUTINE_SUSPENDED ,那么就直接 parentFun 方法退出,实现协程挂起。
    2 . 当 label = 1 时,一定是 childFun1 方法内部通过 $continuationresumeWith 方法回调来的,得到 childFun1 方法异步结果值。我们的例子中,不会走到这一步,因为我们直接返回了结果值。
    3 . 当 label = 2 时, 同上,是 childFun2 方法内部通过 $continuationresumeWith 方法回调来的,得到 childFun2 方法异步结果值。
    4 . 当 label = 3 时, 同上原理。

即: label = 1 表示调用了 childFun1 方法;label = 2 表示调用了 childFun2 方法;label = 3 表示调用了 childFun3 方法。并等待 resumeWith 回调带来的结果值。

这种实现方式,我们称之为状态机。

2.3 协程函数的原理

通过上面的分析,我们了解到协程是如何实现挂起和恢复的。
不同于线程的阻塞和唤醒,协程的挂起是方法直接返回,不执行接下来的代码,它的恢复是通过被调用放来实现的。
以上面的例子为例:

  1. parentFun 方法先调用了 childFun1 协程方法,并将自己的协程体 $continuation 传递给 childFun1 方法
  2. 如果 childFun1 方法返回值是 COROUTINE_SUSPENDED ,那么 parentFun 方法直接退出,不会执行 childFun2 方法;直到 childFun1 方法调用 $continuationresumeWith 方法,就会重新调用 parentFun 方法,并因为 label = 1,直接获取结果值,再调用 childFun2 方法。
  3. 如果 childFun1 方法返回值不是 COROUTINE_SUSPENDED,即不需要挂起,那么就继续调用 childFun2 方法。
  4. childFun2childFun3 方法调用过程和 childFun1 方法逻辑一样。

因此我们可以总结:

  1. 当在协程函数内部调用其他协程函数时,都会生成一个挂起点,即这里的 childFun1 childFun2childFun3 ,都对应一个 label
  2. 调用其他协程函数的时候,都会将本协程函数的协程体 $continuation 传递给被调用的协程函数,以便被调用的协程函数可以回调恢复本协程函数。
  3. 当被调用的协程函数返回 COROUTINE_SUSPENDED,即表明被调用的协程函数是一个异步操作,希望本协程函数挂起,等待被调用的协程函数执行完成回调它。本协程函数就会从这个挂起点直接返回,不再执行下面的代码,直到被调用的协程函数通过 $continuationresumeWith 方法来恢复本协程函数,并继续执行下面代码,直到遇到下一个挂起点。

本质上协程也是通过回调实现异步操作的,只不过 kotlin 编译器将协程函数变成状态机。
也明白了为什么 suspend 函数为什么只能在 suspend 函数内部调用,而不能在普通函数内部执行,因为没有隐藏的 $continuation 对象。

三. 创建协程

上面分析了 suspend 函数,但是现在这个函数,没办法执行,因为 suspend 函数都需要 Continuation 实例,那么第一个 Continuation 实例该如何创建呢?
kotlin 标准库中提供了两个函数来创建 Continuation 实例

3.1 createCoroutine 方法

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

通过 suspend 函数去创建 Continuation 对象,返回 SafeContinuation 类的实例,这个类以后我们有机会分析。
completion 当协程完成之后,会调用它的 resumeWith 方法。

例如:

fun main() {
    val coroutine = (::parentFun).createCoroutine(object : Continuation<Int>{
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("result:$result")
        }
    })
    // 执行
    coroutine.resume(Unit)
}

3.2 startCoroutine 方法

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}

这个方法不仅创建了 Continuation 对象,并且还直接执行了协程函数。
例如:

fun main() {
    (::parentFun).startCoroutine(object : Continuation<Int>{
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("result:$result")
        }
    })
}

3.3 suspendCoroutine 方法

上面的例子中 childFun1 这些方法,我们并没有实现异步操作,这里有两个难点:

  1. 没有办法返回 COROUTINE_SUSPENDED 值,因为它不是 Int 类型,且外部获取不到这个值。
  2. 获取不到调用方的 Continuation 对象,来恢复调用方。

针对这种情况,kotlin 提供了 suspendCoroutine 方法来解决这个问题。

@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }

childFun1 方法进行改变

suspend fun childFun1(): Int {
    Thread.sleep(1000)
    return 10
}

// 转变成
val executor = Executors.newScheduledThreadPool(1) 

suspend fun childFun1(): Int = suspendCoroutine { continuation ->
    executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
}

这里我们并没有直接返回,而是通过 executor 线程池,延迟 1 秒钟之后再返回值,模拟异步操作。

转成的 java 代码

   @Nullable
   public static final Object childFun1(@NotNull Continuation $completion) {
      boolean var1 = false;
      boolean var3 = false;
      SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
      Continuation continuation = (Continuation)var4;
      int var6 = false;
      executor.schedule((Runnable)(new CoroutineKt$childFun1$2$1(continuation)), 1000L, TimeUnit.MILLISECONDS);
      Object var10000 = var4.getOrThrow();
      if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
         DebugProbesKt.probeCoroutineSuspended($completion);
      }

      return var10000;
   }

因为 suspendCoroutine 函数时内联函数,因此函数内容直接复制到 childFun1 函数中。

SafeContinuation 的 getOrThrow 方法

    @PublishedApi
    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }

可以看出在没有调用 resumeWith 方法时,就返回 COROUTINE_SUSPENDED

val executor = Executors.newScheduledThreadPool(2)

suspend fun childFun1(): Int = suspendCoroutine { continuation ->
    executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
}

suspend fun childFun2(): Int = suspendCoroutine { continuation ->
    executor.schedule(fun() { continuation.resume(20) }, 2000, TimeUnit.MILLISECONDS)
}

suspend fun childFun3(num1: Int, num2: Int): Int = suspendCoroutine { continuation ->
    executor.schedule(fun() { continuation.resume(num1 + num2) }, 1000, TimeUnit.MILLISECONDS)
}

suspend fun parentFun() : Int {
    val num1 = childFun1()
    val num2 = childFun2()
    val sum = childFun3(num1, num2)
    return sum
}

fun main() {
    (::parentFun).startCoroutine(object : Continuation<Int>{
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("result:$result")
        }
    })
}
上一篇下一篇

猜你喜欢

热点阅读