Kotlin

[Android] 深入理解Kotlin协程

2022-09-26  本文已影响0人  super可乐

Kotlin协程

协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快

我们知道线程是CPU调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin中,Dispatcher(内部是线程池或者Android主线程)是调度器,可以调度协程运行在一个或多个线程之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:

协程优势:

Suspend 非阻塞式挂起函数

协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call)return之外,协程添加了 suspendresume

suspend 函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function 的提示;如需调用 suspend 函数,只能从其他 suspend 函数进行调用,或通过使用协程构建器(例如 launch)来启动新的协程。

suspend 函数相比于普通函数内部多了一个 Continuation 续体实例(Kotlin转成Java代码之后即可看到),suspend函数中可以调用普通函数,但普通函数却不能调用suspend函数

 suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

suspend 修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java 中的Thread.sleep() 会阻塞当前线程,suspend 修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但可以去执行协程外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停 -> 子线程执行完毕 -> 协程从挂起点恢复

如在上面的示例中,get() 在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get 会恢复已挂起的协程,继续执行show(result) 方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起

Kotlin 使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。

CPS变换 + Continuation续体 + 状态机

CPS变换 (Continuation-Passing-Style Transformation) 是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation 来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行

Continuation意为续体,只存在于挂起函数中。Kotlin 中当一个普通函数加上suspend 关键字之后,就成为了挂起函数,如:

private suspend fun suspendFuc() {}

我们对此函数进行反编译,查看对应的Java代码为:

private final Object suspendFuc(Continuation $completion) {
      return Unit.INSTANCE;
}

可以看到Java代码中系统帮我们多生成了一个Continuation 类型的参数。Continuation 是一个接口类型,表示在返回T类型值的挂起点之后的延续,其中类型T代表的是原来函数的返回值类型。

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

resumeWith 恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。

上述函数中加入一个delay 函数,delay 函数本身也是一个挂起函数:

    private suspend fun suspendFuc(): String {
        delay(1000)
        return "value"
    }

经过Tools -> Kotlin -> Show Kotlin Bytecode 反编译查看:

   private final Object suspendFuc(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

          //将要执行的Continuation逻辑传入ContinuationImpl中
         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

             //invokeSuspend()会在恢复协程挂起点时调用
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineBaseFragment.this.suspendFuc(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      //返回此状态意味着函数要被挂起
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

      // 状态机逻辑,通过label进行分块执行
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      return "value";
   }

suspend挂起函数经过Kotlin编译器编译之后会进行CPS变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1,上述函数逻辑中,通过switch(label){} 状态机将逻辑分块执行,首先label = 0,此时会先将label置为1,接着调用了delay挂起函数,返回Intrinsics.COROUTINE_SUSPENDED意味着函数要被挂起;

当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result) 方法恢复执行,$result是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。

此时label变为1ResultKt.throwOnFailure($result)中通过 if (value is Result.Failure) throw value.exception判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break,执行到最后的return "value" 逻辑,这样整个方法的流程就运行完毕了。

总结:suspend挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机来运转的

CoroutineContext

我们知道 Android 中的 Context(子类有Application、Activity、Service)可以获取应用资源,可以启动ActivityService等。CoroutineContext意为协程上下文,其作用可以类比 Context,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称等。

public interface CoroutineContext {
    //从该上下文中返回具有给定[key]的元素 或 null 
    public operator fun <E : Element> get(key: Key<E>): E?

    //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    //删除带有指定[key]的元素。
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

    //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

协程总是会运行在以 CoroutineContext 类型的上下文中,其中plus操作符被重写,多个CoroutineContext相加时,会生成一个CombinedContext( 内部可以理解成一个Element都不一样的链表);同时 CombinedContext拥有 map索引能力,集合中的每个元素都有一个唯一的 Key

继承关系

Element 定义在CoroutineContext内部,是它的内部接口。CoroutineContext 使用以下元素集定义协程的行为:

下面就来详细看看几种元素的使用。

CoroutineContext几种具体实现

1、Job & SupervisorJob

Job是协程的句柄。使用launchasync创建的协程都会返回一个Job实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:

val job1 = GlobalScope.launch { }
val job2 = MainScope().launch { }

override fun onDestroy() {
    super.onDestroy()
    job1.cancel()
    job2.cancel()
}

注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel中使用viewModelScopeLifecycleOwner中使用lifecycleScope,可以在各自的生命周期中自动关闭协程。

  val job = GlobalScope.launch(context = Job() + Dispatchers.Main,
      start = CoroutineStart.LAZY) {
      // 逻辑部分
   }

  job常用的API:
  -  job.start()  // 对应 start = CoroutineStart.LAZY
  -  job.cancelAndJoin()  //cancel() + join()
  -  job.cancel() // 取消
  -  job.isActive // 协程是否存活
  -  job.isCancelled // 协程是否被取消
  -  job.isCompleted //协程是否已经执行完毕

Job 有以下状态:

State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (transient state) true false false
Cancelling (transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false

状态流转:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

协程通过状态机方式实现自动回调。

SupervisorJob的使用

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }

GlobalScope.launch(exceptionHandler) {
   //子Job1
    launch(SupervisorJob()) {
        delay(200)
        throw NullPointerException()
     }
   //子Job2
    launch {
        delay(300)
        log("child2 execute successfully")
    }
    //子Job3
    launch {
        delay(400)
        log("child3 execute successfully")
    }
    log("parent execute successfully")
}

注意子Job1传入的是 SupervisorJob,当发生异常时,兄弟协程(job2/job3)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
2022-08-30 23:31:30.446  E/TTT: child2 execute successfully
2022-08-30 23:31:30.545  E/TTT: child3 execute successfully

如果将job1中的更换为默认时,执行结果为:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException

可以看到job1中使用默认设置时,当job1发生了异常,兄弟协程也会被取消。注意这里job2/job3可以被取消是因为delay()中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1使用默认配置,job3也不会被取消:

//子Job3
 launch(Dispatchers.IO) {
     Thread.sleep(400)
     log("child3 execute successfully")
 }

job3中没有了对协程状态的判断,所以即使job3被通知要取消协程了,依然会继续执行直到结束。

SuperviorJob源码浅析

public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

当子协程 job1/job2/job3 启动时,会和协程本身的 Job 形成父子关系。而当子协程抛异常时,父JobchildCancelled() 方法会被执行,且默认返回的是 true,表示取消 父Job 及其所有 子Job;如果 子Job中使用的是 SuperviorJobchildCancelled() 返回的是 false,表示 父Job及其未发生异常的 子Job都不会受影响。

2、CoroutineDispatcher
public actual object Dispatchers {
   @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

将工作分派到适当的线程。

withContext

withContext() 可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext() 来确保每个函数都是主线程安全的,这意味着,您可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。

3、CoroutineName

协程的名称,可用于调试。

4、CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {

    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    //当有未处理的异常发生时,该方法就会执行
    public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash,可以通过自定义CoroutineExceptionHandler来拦截异常,如下:

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) { // do something }              

自定义CoroutineExceptionHandler调用的是如下方法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

所以当发生异常时,会在handleException()中执行我们传入的handler(CoroutineContext, Throwable)方法。

CoroutineExceptionHandler使用示例

 val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
 lifecycleScope.launch(exceptionHandler) {
     log("parent execute start")
     val childExHandler =
                    CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
     //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
     val childJob = launch(childExHandler) {
         delay(1000)
         log("child execute")
         throw  IllegalArgumentException("error occur")
     }
     childJob.join()
     log("parent execute end")
 }

执行结果:

2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur

可见虽然在子协程中也自定义了CoroutineExceptionHandler,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job(),则异常会在子协程中处理而不会往上传播:

 //其他内容不变
 val childJob = launch(childExHandler + SupervisorJob()) {
     //其他内容不变
 }

执行结果:

2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end

可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。

:如果启动协程的是async,则CoroutineExceptionHandler中的handler方法并不会马上执行,必须调用deffered.await()时才会执行。

CoroutineScope 协程作用域

CoroutineScope 会跟踪它使用 launchasync 创建的所有协程。CoroutineScope 本身并不运行协程,但是通过CoroutineScope可以保证对协程进行统一管理,避免发生内存泄漏等,

常见的CoroutineScope

参考

【1】官网:利用 Kotlin 协程提升应用性能
【2】官网:Android 上的 Kotlin 协程
【3】Kotlin挂起函数原理
【4】揭秘Kotlin协程中的CoroutineContext

作者:小马快跑
链接:https://juejin.cn/post/7145800423641710599

上一篇 下一篇

猜你喜欢

热点阅读