Kotlin学习之协程的协程上下文与调度器

2020-05-15  本文已影响0人  James999

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中,它们被定义在了 Kotlin 的标准库里。
协程上下文是各种不同元素的集合。其中主元素是协程中的 Job, 我们在前面的文档中⻅过它以及它的
调度器,而本文将对它进行介绍。

调度器与线程

协程上下文包含一个 协程调度器 (参⻅ CoroutineDispatcher)它确定了哪些线程或与线程相对应的协程执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。
所有的协程构建器诸如 launch 和 async 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。
尝试下面的示例:

    fun main22() = runBlocking<Unit> {
        launch { // context of the parent, main runBlocking coroutine
            println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
            println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
            println("Default               : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
            println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
        }
    }

它执行后得到了如下输出(也许顺序会有所不同):

2020-05-14 16:00:06.778 32275-32275/com.yy.kotlindemo I/System.out: Unconfined            : I'm working in thread main
2020-05-14 16:00:06.792 32275-32275/com.yy.kotlindemo I/System.out: main runBlocking      : I'm working in thread main
2020-05-14 16:00:06.793 32275-32461/com.yy.kotlindemo I/System.out: Default               : I'm working in thread DefaultDispatcher-worker-2
2020-05-14 16:00:06.794 32275-32462/com.yy.kotlindemo I/System.out: newSingleThreadContext: I'm working in thread MyOwnThread

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器在调用它的线程启动了一个协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。非受限的调度器非常适用于执行不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。
另一方面,该调度器默认继承了外部的 CoroutineScope。 runBlocking 协程的默认调度器,特别是, 当它被限制在了调用者线程时,继承自它将会有效地限制协程在该线程运行并且具有可预测的 FIFO 调度。

    fun main23() = runBlocking<Unit> {
        launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
            println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
            delay(500)
            println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
        }
        launch { // context of the parent, main runBlocking coroutine
            println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
            delay(1000)
            println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
        }
    }

执行后的输出:

2020-05-14 16:18:49.141 1810-1810/com.yy.kotlindemo I/System.out: Unconfined      : I'm working in thread main
2020-05-14 16:18:49.153 1810-1810/com.yy.kotlindemo I/System.out: main runBlocking: I'm working in thread main
2020-05-14 16:18:49.653 1810-1908/com.yy.kotlindemo I/System.out: Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
2020-05-14 16:18:50.191 1810-1810/com.yy.kotlindemo I/System.out: main runBlocking: After delay in thread main

所以,该协程的上下文继承自 runBlocking {...} 协程并在 main 线程中运行,当 delay 函数调用的时候,非受限的那个协程在默认的执行者线程中恢复执行。
非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该在通常的代码中使用。

调试协程与线程

协程可以在一个线程上挂起并在其它线程上恢复。 甚至一个单线程的调度器也是难以弄清楚协程在何时何地正在做什么事情。使用通常调试应用程序的方法是让线程在每一个日志文件的日志声明中打印线程的名字。这种特性在日志框架中是普遍受支持的。但是在使用协程时,单独的线程名称不会给出很多协程上下文信息,所以 kotlinx.coroutines 包含了调试工具来让它更简单。

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
    fun main24() = runBlocking<Unit> {
        val a = async {
            log("I'm computing a piece of the answer")
            6
        }
        val b = async {
            log("I'm computing another piece of the answer")
            7
        }
        log("The answer is ${a.await() * b.await()}")
    }

这里有三个协程,包括 runBlocking 内的主协程 (#1) , 以及计算延期的值的另外两个协程 a (#2)和 b (#3)。 它们都在 runBlocking 上下文中执行并且被限制在了主线程内。 这段代码的输出如下:

2020-05-14 16:42:00.011 3397-3397/com.yy.kotlindemo I/System.out: [main] I'm computing a piece of the answer
2020-05-14 16:42:00.012 3397-3397/com.yy.kotlindemo I/System.out: [main] I'm computing another piece of the answer
2020-05-14 16:42:00.012 3397-3397/com.yy.kotlindemo I/System.out: [main] The answer is 42

这个 log 函数在方括号种打印了线程的名字,并且你可以看到它是 main 线程,并且附带了当前正在其上执行的协程的标识符。这个标识符在调试模式开启时,将连续分配给所有创建的协程。

当 JVM 以 -ea 参数配置运行时,调试模式也会开启。 你可以在 DEBUG_PROPERTY_NAME 属性的文档中阅读有关调试工具的更多信息

在不同线程间跳转

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码

    fun main25() {
        newSingleThreadContext("Ctx1").use { ctx1 ->
            newSingleThreadContext("Ctx2").use { ctx2 ->
                runBlocking(ctx1) {
                    log("Started in ctx1")
                    withContext(ctx2) {
                        log("Working in ctx2")
                    }
                    log("Back to ctx1")
                }
            }
        }
    }

它演示了一些新技术。其中一个使用 runBlocking 来显式指定了一个上下文,并且另一个使用withContext 函数来改变协程的上下文,而仍然驻留在相同的协程中,正如可以在下面的输出中所⻅到的。

2020-05-14 18:15:53.193 11334-11500/com.yy.kotlindemo I/System.out: [Ctx1 @coroutine#1] Started in ctx1
2020-05-14 18:15:53.200 11334-11501/com.yy.kotlindemo I/System.out: [Ctx2 @coroutine#1] Working in ctx2
2020-05-14 18:15:53.201 11334-11500/com.yy.kotlindemo I/System.out: [Ctx1 @coroutine#1] Back to ctx1

注意,在这个例子中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候, 它使用了 Kotlin 标准库中的 use 函数来释放该线程。

上下文中的作业

协程的 Job 是上下文的一部分,并且可以使用 coroutineContext [Job] 表达式在上下文中检索它:

println("My job is ${coroutineContext[Job]}")

在调试模式下,它将输出如下这些信息:

System.out: My job is "coroutine#1":BlockingCoroutine{Active}@ab34d69

请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true的一种方便的快捷方式。

子协程

当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程作业的 子作业。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。
然而,当使用 GlobalScope 来启动一个协程时,则新协程的作业没有父作业。 因此它与这个启动的作用域无关且独立运作。

fun main27() = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            // it spawns two other jobs, one with GlobalScope
            GlobalScope.launch {
                println("job1: I run in GlobalScope and execute independently!")
                delay(1000)
                println("job1: I am not affected by cancellation of the request")
            }
            // and the other inherits the parent context
            launch {
                delay(100)
                println("job2: I am a child of the request coroutine")
                delay(1000)
                println("job2: I will not execute this line if my parent request is cancelled")
            }
        }
        delay(500)
        request.cancel() // cancel processing of the request
        delay(1000) // delay a second to see what happens
        println("main: Who has survived request cancellation?")
    }

这段代码的输出如下,

I/System.out: job1: I run in GlobalScope and execute independently!
I/System.out: job2: I am a child of the request coroutine
I/System.out: job1: I am not affected by cancellation of the request
I/System.out: main: Who has survived request cancellation?

父协程的职责

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动,并且不必使用Job.join 在最后的时候等待它们:

   // 启动一个协程来处理某种传入请求(request)
    fun main28() = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            repeat(3) { i -> // // 启动少量的子作业
                launch  {
                    delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                    println("Coroutine $i is done")
                }
            }
            println("request: I'm done and I don't explicitly join my children that are still active")
        }
        request.join() // 等待请求的完成,包括其所有子协程
        println("Now processing of the request is complete")
    }

结果如下所示:

/System.out: Coroutine 0 is done
/System.out: Coroutine 1 is done
/System.out: Coroutine 2 is done
/System.out: Now processing of the request is complete

命名协程以用于调试

当协程经常打印日志并且你只需要关联来自同一个协程的日志记录时, 则自动分配的 id 是非常好的。然而,当一个协程与特定请求的处理相关联时或做一些特定的后台任务,最好将其明确命名以用于调试目的。 CoroutineName 上下文元素与线程名具有相同的目的。当调试模式开启时,它被包含在正在执行此协程的线程名中。
下面的例子演示了这一概念:

    fun main29() = runBlocking(CoroutineName("main")) {
        log("Started main coroutine")
        // run two background value computations
        val v1 = async(CoroutineName("v1coroutine")) {
            delay(500)
            log("Computing v1")
            252
        }
        val v2 = async(CoroutineName("v2coroutine")) {
            delay(1000)
            log("Computing v2")
            6
        }
        log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
    }

程序执行使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I/System.out: [main @main#1] Started main coroutine
I/System.out: [main @v1coroutine#2] Computing v1
I/System.out: [main @v2coroutine#3] Computing v2
I/System.out: [main @main#1] The answer for v1 / v2 = 42

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

   fun main30() = runBlocking<Unit> {
        launch(Dispatchers.Default + CoroutineName("test")) {
            println("I'm working in thread ${Thread.currentThread().name}")
        }
    }

这段代码使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I/System.out: I'm working in thread DefaultDispatcher-worker-1 @test#2

协程作用域

让我们将关于上下文,子协程以及作业的知识综合在一起。假设我们的应用程序拥有一个具有生命周期的对象,但这个对象并不是一个协程。举例来说,我们编写了一个 Android 应用程序并在 Android 的activity 上下文中启动了一组协程来使用异步操作拉取并更新数据以及执行动画等等。所有这些协程必须在这个 activity 销毁的时候取消以避免内存泄漏。当然,我们也可以手动操作上下文与作业,以结合activity 的生命周期与它的协程,但是 kotlinx.coroutines 提供了一个封装:CoroutineScope 的抽象。 你应该已经熟悉了协程作用域,因为所有的协程构建器都声明为在它之上的扩展。
我们通过创建一个 CoroutineScope 实例来管理协程的生命周期,并使它与 activit 的生命周期相关联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() 工厂函数。前者创建了一个通用作用域,而后者为使用 Dispatchers.Main 作为默认调度器的 UI 应用程序 创建作用域:

class Activity {
         private val mainScope = MainScope()
         fun destroy() {
            mainScope.cancel()
         }
        // 继续运行...

或者,我们可以在这个 Activity 类中实现 CoroutineScope 接口。最好的方法是使用具有默认工厂函数的委托。 我们也可以将所需的调度器与作用域合并(我们在这个示例中使用Dispatchers.Default)。

class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
// 继续运行......

现在,在这个 Activity 的作用域中启动协程,且没有明确指定它们的上下文。在示例中,我们启动了十个协程并延迟不同的时间:

    // 在 Activity 类中
    fun doSomething() {
// 在示例中启动了 10 个协程,且每个都工作了不同的时⻓
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

在 main 函数中我们创建 activity,调用测试函数 doSomething ,并且在 500 毫秒后销毁这个activity。 这取消了从 doSomething 启动的所有协程。我们可以观察到这些是由于在销毁之后, 即使我们再等一会儿,activity 也不再打印消息。

import kotlinx.coroutines.*

class Activity {
    private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes
    
    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

fun main() = runBlocking<Unit> {
    val activity = Activity()
    activity.doSomething() // run test function
    println("Launched coroutines")
    delay(500L) // delay for half a second
    println("Destroying activity!")
    activity.destroy() // cancels all coroutines
    delay(1000) // visually confirm that they don't work
}

这个示例的输出如下所示:

2020-05-15 14:33:48.635 30433-30433/com.yy.kotlindemo I/System.out: Launched coroutines
2020-05-15 14:33:48.867 30433-30521/com.yy.kotlindemo I/System.out: Coroutine 0 is done
2020-05-15 14:33:49.082 30433-30521/com.yy.kotlindemo I/System.out: Coroutine 1 is done
2020-05-15 14:33:49.181 30433-30433/com.yy.kotlindemo I/System.out: Destroying activity!

你可以看到,只有前两个协程打印了消息,而另一个协程在 Activity.destroy() 中单次调用了job.cancel() 。

线程局部数据

有时,能够将一些线程局部数据传递到协程与协程之间是很方便的。 然而,由于它们不受任何特定线程的约束,如果手动完成,可能会导致出现样板代码。
ThreadLocal, asContextElement 扩展函数在这里会充当救兵。它创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。它很容易在下面的代码中演示:

    fun main32() = runBlocking<Unit> {
        threadLocal.set("main")
        println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
            println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
            yield()
            println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        }
        job.join()
        println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }

在这个例子中我们使用 Dispatchers.Default 在后台线程池中启动了一个新的协程,所以它工作在线程池中的不同线程中,但它仍然具有线程局部变量的值, 我们指定使用threadLocal.asContextElement(value = "launch") , 无论协程执行在什么线程中都是没有问题的。 因此,其输出如(调试)所示:

I/System.out: Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
I/System.out: Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
I/System.out: After yield, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
I/System.out: Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

这很容易忘记去设置相应的上下文元素。如果运行协程的线程不同, 在协程中访问的线程局部变量则可能会产生意外的值。 为了避免这种情况,建议使用 ensurePresent 方法并且在不正确的使用时快速失败。
ThreadLocal 具有一流的支持,可以与任何 kotlinx.coroutines 提供的原语一起使用。 但它有一个关键限制,即:当一个线程局部变量变化时,则这个新值不会传播给协程调用者(因为上下文元素无法追踪所有 ThreadLocal 对象访问)
,并且下次挂起时更新的值将丢失。 使用 withContext 在协程中更新线程局部变量,详⻅ asContextElement。
另外,一个值可以存储在一个可变的域中,例如 class Counter(var i: Int) ,是的,反过来, 可以存储在线程局部的变量中。然而,在这个案例中你完全有责任来进行同步可能的对这个可变的域进行的并发的修改。
对于高级的使用,例如,那些在内部使用线程局部传递数据的用于与日志记录 MDC 集成,以及事务上下文或任何其它库,请参⻅需要实现的 ThreadContextElement 接口的文档。

上一篇下一篇

猜你喜欢

热点阅读