Kotlin协程

2019-08-29  本文已影响0人  晨起清风

协程(Coroutine)

协程引入

异步加载图片

 
通过上面的例子对比,可以发现使用协程的优点:
把异步的代码转换成同步的写法,免除了回调的问题,并且不会阻塞线程;
请求结果返回时协程自动帮我们切回到主线程;
异常的处理比较简洁,协程内部的异常使用一个try...catch即可捕获;

协程概念

协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。

协程和线程,进程不同,它通常不是由操作系统底层直接支持,而是通过编译器和应用层的库实现。


image

Kotlin协程框程架

协程正式版1.0在Kotlin 1.3 上发布,协程的语言支持与 API 已完全稳定。
此后 kotlin 协程不再会被标注为 experimental,在 Kotlin 1.3. 之后的版本就可以使用协程代码了。

引入协程库
dependencies {
  implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.2'
}
image
协程基本术语

 

 

协程的启动

1.简单以 CoroutineScope 的扩展函数 launch 启动一个协程:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
        println("World!") // 在延迟后打印输出
    }
    println("Hello,") // 协程已在等待时主线程还在继续
    Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
}

launch 函数有三个参数:分别是 上下文启动模式协程体
launch 函数的返回值是 Job 类型。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

 

启动模式 功能特性
DEFAULT 立即开始执行协程体
LAZY 只有在需要(start、join、await)时才开始执行
ATOMIC 立即执行协程体,但在第一个挂起点前不能被取消
UNDISPATCHED 立即在当前线程执行协程体,直到第一个挂起点(后面取决于调度器)

 

State isActive isCompleted isCancelled
New (可选的初始状态) false false false
Active (默认的初始状态) true false false
Completing (中间状态) true false false
Cancelling (中间状态) false false true
Cancelled (最终状态) false true true
Completed (最终状态) false true false

一般而言,Job 创建后都处于 active 状态,表示这个 Job 已经被创建并且被启动了。
通过 协程构建器 函数的 start 参数可以修改这个状态。
比如如果使用 CoroutineStart.LAZY 作为 start 参数,则创建的 Job 处于 new 状态,
这个时候需要通过调用 Jobstart 或者 join 函数来把该 Job 转换为 active 状态。

处于 active 状态的 Job 表示 协程 正在执行。
如果执行过程中抛出了异常则会把该 Job 标记为 cancelling 状态。
除此之外,还可以通过调用 cancel 函数来把 Job 转换为 cancelling 状态。
然后当 Job 完成后就处于 cancelled 状态。

image

具有多个子 Job 的父 Job 会等待所有子 job 完成(或者取消)后,自己才会执行完成。

示例,调用 cancel 函数取消协程执行。

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancel() // 取消该任务
    job.join() // 等待任务执行结束
    println("main: Now I can quit.")
//sampleEnd
}

程序执行后的输出:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

 
2.使用 async 启动一个协程:
在概念上,async 就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发地工作。
不同之处在于 launch 返回一个Job 并且不附带任何结果值,而 async 返回一个 Deferred

Deferred 继承自 Job,所以通过 Deferred 也可以和 Job 一样来控制这个 协程。

Deferred 类似于 Java 里面的 future,一个轻量级的非阻塞 future
通过调用 Deferred 对象的 await() 函数来等待异步结果的返回。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {

    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L)
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L)
    return 29
}

程序执行后的输出:

The answer is 42
协程的调度

上面提到协程上下文作为launch函数的参数。

CoroutineContext 接口的定义:

@SinceKotlin("1.3")
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    public fun minusKey(key: Key<*>): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        public val key: Key<*>
        ...
    }
}

CoroutineContext 类似一个集合,它的元素就是源码中看到的 Element。
每一个 Element 都有一个 key,因此它可以作为元素出现,同时它也是 CoroutineContext 的子接口,因此也可以作为集合出现。

通常我们见到的上下文的类型是 CombinedContext 或者 EmptyCoroutineContext
一个表示上下文的组合,另一个表示什么功能都没有的空的上下文。

CoroutineContext 有两个非常重要的元素:JobDispatcher
Job 是当前的协程的实例,而 Dispatcher 决定了当前协程体执行的线程。

我们在协程体里面访问到的 coroutineContext 大多是这个 CombinedContext 类型,表示有很多具体的上下文实现的集合。
我们如果想要找到某一个特别的上下文实现,就需要用对应的 Key 来查找:

import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlin.coroutines.ContinuationInterceptor

fun main() {
    GlobalScope.launch {
        println(coroutineContext[Job]) //StandaloneCoroutine{Active}@4f5a4106
        println(coroutineContext[ContinuationInterceptor]) //DefaultDispatcher
    }
    Thread.sleep(2000)
}

 
Job 是一个 CoroutineContext.Element 的实现,内部有个伴生对象 Key
coroutineContext[Job] 这里的 Job 实际上是对它的伴生对象的引用。

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job> { ... }
    ...
}

ContinuationInterceptor 表示续体拦截器(以下简称拦截器),也是一个 CoroutineContext.Element 的实现。

@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
    /**
     * The key that defines *the* context interceptor.
     */    
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}

拦截器可以左右你的协程的执行。

以下是自定义拦截器的例子。让协程代码块运行在自定义的线程中。

suspend fun main() {
    val job: Job = GlobalScope.launch(MyContinuationInterceptor()) {
        Logger.debug(1)
        delay(1000)
        Logger.debug(2)
        launch {
            Logger.debug(3)
        }
    }
    job.join()
}

class MyContinuationInterceptor : ContinuationInterceptor {
    override val key = ContinuationInterceptor

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        return MyContinuation(continuation)
    }
}

class MyContinuation<T>(val continuation: Continuation<T>) : Continuation<T> {
    override val context: CoroutineContext = continuation.context

    private val executor = Executors.newSingleThreadExecutor {
        Thread(it, "MyThreadExecutor").also { it.isDaemon = true }
    }

    override fun resumeWith(result: Result<T>) {
        Logger.debug(result)
        executor.submit {
            //切换线程(模拟简单的协程调度器)
            continuation.resumeWith(result)
        }
    }
}

所有协程启动的时候,都会有一次 Continuation.resumeWith 的操作。
对于受MyContinuationInterceptor影响的协程启动时,都会执行MyContinuation.resumeWith
delay 是挂起点,1000ms 之后需要继续调度执行该协程,也会执行MyContinuation.resumeWith

18:01:41:337 [main] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit)  // ①
18:01:41:358 [MyThreadExecutor] ConsoleMain3Kt$main$job$1.invokeSuspend(ConsoleMain3.kt:20): 1
18:01:42:371 [kotlinx.coroutines.DefaultExecutor] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit)  // ②
18:01:42:372 [MyThreadExecutor] ConsoleMain3Kt$main$job$1.invokeSuspend(ConsoleMain3.kt:22): 2
18:01:42:375 [MyThreadExecutor] MyContinuation.resumeWith(ConsoleMain3.kt:46): Success(kotlin.Unit)  // ③
18:01:42:376 [MyThreadExecutor] ConsoleMain3Kt$main$job$1$1.invokeSuspend(ConsoleMain3.kt:24): 3

 

Kotlin协程库中有专门的拦截器的实现,叫做 CoroutineDispatcher(协程调度器,以下简称调度器)
它确定了相应的协程在执行时使用哪些线程。

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    ...
}

它本身是协程上下文的子类,同时实现了拦截器的接口, dispatch 方法会在拦截器的方法 interceptContinuation 中调用,进而实现协程的调度。
所以如果我们想要实现自己的调度器,继承这个类就可以了。
不过通常我们都用协程框架提供的,它们定义在 Dispatchers 当中:

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

调度器 JVM
Default 线程池
Main UI线程
Unconfined 直接执行,直到遇到第一个挂起点
IO 线程池

指定调度器启动协程:

suspend fun main() {
    GlobalScope.launch(/** Dispatchers.Default **/) {
        Logger.debug(1)
        launch(Dispatchers.Main) {
            Logger.debug(2)
        }
    }.join()
}

GlobalScope.launch 默认使用 DefaultDispatcher 调度任务。
Dispatchers.Main 来确保 launch 启动的协程在调度时始终调度到 UI 线程。
在 Android 当中,Dispatchers.Main 引用的实例是 HandlerDispatcher

执行结果如下:

19:07:31:494 [DefaultDispatcher-worker-1] DispatchKt$main$2.invokeSuspend(Dispatch.kt:23): 1
19:07:31:591 [AWT-EventQueue-0] DispatchKt$main$2$1.invokeSuspend(Dispatch.kt:25): 2

 
其他创建调度器的方式:

suspend fun main() {
    Executors.newSingleThreadExecutor { r -> Thread(r, "MyThread") }
        .asCoroutineDispatcher()
        .use { dispatcher ->
            GlobalScope.launch(dispatcher) {
                Logger.debug(1)
            }.join()
        }
    Logger.debug(2)
}

运行结果:

19:01:36:027 [MyThread] DispatchKt$main$3$1.invokeSuspend(Dispatch.kt:13): 1
19:01:36:051 [MyThread] DispatchKt.main(Dispatch.kt:16): 2
协程的异常处理

通过一个在 GlobalScope 中创建协程的示例来看一下协程的异常处理,中间添加了异常捕获。

fun main() = runBlocking {
    val job = GlobalScope.launch{
        try {
            Logger.debug("launch new coroutine")
            launch {
                Logger.debug("Throwing exception from launch")
                throw IndexOutOfBoundsException()
            }
        } catch (e: Exception) {
            Logger.error("Caught launch Exception: $e") //①
        }
    }
    job.join()
    Logger.debug("Joined failed job")

    val deferred = GlobalScope.async {
        Logger.debug("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
    }
    try {
        deferred.await()
        Logger.debug("Unreached")
    } catch (e: ArithmeticException) {
        Logger.debug("Caught ArithmeticException") //②
    }
}

输出结果:

16:05:04:654 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:9): launch new coroutine
16:05:04:671 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:11): Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-1 @coroutine#3" java.lang.IndexOutOfBoundsException
    at com.coroutine.console.exception.ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:12)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742)
16:05:04:678 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:19): Joined failed job
16:05:04:680 [DefaultDispatcher-worker-1 @coroutine#4] ConsoleExceptionDemoKt$main$1$deferred$1.invokeSuspend(ConsoleExceptionDemo.kt:22): Throwing exception from async
16:05:04:712 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:29): Caught ArithmeticException

从上面的输出的log,会发现并没有捕获到①处的异常。②处的异常却被捕获到了。

不同的协程构建器函数有不同的异常传递策略,在协程中异常传递分为两种类型。
一种是自动向上传递(launchactor),另外一种是把错误信息暴露给调用者(asyncproduce)。
前者对待异常是不处理的,类似于 Java 的 Thread.uncaughtExceptionHandler ,而后者依赖调用者来最终处理异常。

如果把代码改写成以下就可以捕获到异常:

fun main() = runBlocking {

    val handler = CoroutineExceptionHandler { _, exception ->
        Logger.error("Caught $exception")
    }

    val job = GlobalScope.launch(handler) {
        Logger.debug("launch new coroutine")
        launch {
            Logger.debug("Throwing exception from launch")
            throw IndexOutOfBoundsException()
        }
    }
    job.join()
    Logger.debug("Joined failed job")

    val deferred = GlobalScope.async {
        Logger.debug("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
    }
    try {
        deferred.await()
        Logger.debug("Unreached")
    } catch (e: ArithmeticException) {
        Logger.debug("Caught ArithmeticException") //②
    }
}
16:03:28:861 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:13): launch new coroutine
16:03:28:877 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$job$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): Throwing exception from launch
16:03:28:881 [DefaultDispatcher-worker-1 @coroutine#3] ConsoleExceptionDemoKt$main$1$invokeSuspend$$inlined$CoroutineExceptionHandler$1.handleException(CoroutineExceptionHandler.kt:82): Caught java.lang.IndexOutOfBoundsException
16:03:28:882 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:20): Joined failed job
16:03:28:883 [DefaultDispatcher-worker-1 @coroutine#4] ConsoleExceptionDemoKt$main$1$deferred$1.invokeSuspend(ConsoleExceptionDemo.kt:23): Throwing exception from async
16:03:28:915 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:30): Caught ArithmeticException

 
CoroutineExceptionHandler
对于通过 launchactor 构建器创建的协程,如果里面抛出了异常需要通过 CoroutineExceptionHandler 来捕获异常。
CoroutineExceptionHandler 类似于 Thread.uncaughtExceptionHandler 全局异常处理。
CoroutineExceptionHandler 并不算是一个全局的异常捕获,因为它只能捕获对应协程内未捕获的异常。

从上面代码看到 CoroutineExceptionHandler 可以作为 launch 函数的参数,也能猜到其类型属于 CoroutineContext

CoroutineExceptionHandler 接口定义:

public interface CoroutineExceptionHandler : CoroutineContext.Element {
    /**
     * Key for [CoroutineExceptionHandler] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    /**
     * Handles uncaught [exception] in the given [context]. It is invoked
     * if coroutine has an uncaught exception.
     */
    public fun handleException(context: CoroutineContext, exception: Throwable)
}

 
异常的传播

异常传播还涉及到 CoroutineScope (协程作用域,以下简称作用域)的概念。

CoroutineScope接口的定义:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

CoroutineScopekotlinx.coroutines 提供的一个抽象的封装,用来管理协程的生命周期。

比如:
在Android程序在上下文中启动了多个协程来为某个activity 进行异步操作来拉取以及更新数据,或作动画等。
当 activity 被销毁的时候这些协程必须被取消以防止内存泄漏。就需要用到 CoroutineScope
这样当这个 CoroutineScope 被取消的时候,里面所有的子协程也会自动取消。

每个协程构建器都是 CoroutineScope 的扩展函数,并且自动的继承了当前作用域的 coroutineContext
所以要使用协程必须要先创建一个对应的 CoroutineScope

协程执行代码块的 this 字段就代表了当前使用的 CoroutineScope 实例。
所以就能够在其内部继续启动子协程,比如下面的嵌套的 launch

image

 
总结:

协程的取消

取消是协作的

协程的取消是 协作 的。一段协程代码必须协作才能被取消。
所有 kotlinx.coroutines 中的挂起函数都是 可被取消的。
它们检查协程的取消, 并在取消时抛出 CancellationException
 
示例:

fun main() = runBlocking {
    val job1 = launch { // ①
        Logger.debug(1)
        delay(1000) // ②
        Logger.debug(2)
    }
    delay(100)
    Logger.debug(3)
    job1.cancel() // ③
    Logger.debug(4)
}

执行结果:

14:50:47:604 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:9): 1
14:50:47:645 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:14): 3
14:50:47:647 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:16): 4

程序分析:
这段代码 ① 处启动了一个子协程,它内部先输出 1,接着开始 delay, delay 与线程的 sleep 不同,它不会阻塞线程。
你可以认为它实际上就是触发了一个延时任务,告诉协程调度系统 1000ms 之后再来执行后面的代码;

而在这期间,我们在 ③ 处对刚才启动的协程触发了取消,因此在 ② 处的 delay 还没有回调的时候协程就被取消了。
因为 delay 可以响应取消,因此 delay 后面的代码就不会再执行了,因为② 处的 delay 会抛一个 CancellationException
 
下面来捕获一下这个Exception:

fun main() = runBlocking {
    val job1 = launch { // ①
        Logger.debug(1)
        try {
            delay(1000) // ②
        } catch (e: Exception) {
            Logger.error(e)
        }
        Logger.debug(2)
    }
    delay(100)
    Logger.debug(3)
    job1.cancel() // ③
    Logger.debug(4)
}
15:16:25:924 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:20): 3
15:16:25:926 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): 4
15:16:25:943 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): kotlinx.coroutines.JobCancellationException: Job was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@2be94b0f
15:16:25:943 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job1$1.invokeSuspend(ConsoleExceptionDemo.kt:17): 2

 
如果协程正在执行 计算任务,并且没有检查取消的话,那么它是不能被取消的。

fun main() = runBlocking {
    //sampleStart
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
            // 每秒打印消息两次
            if (System.currentTimeMillis() >= nextPrintTime) {
                Logger.debug("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 等待一段时间
    Logger.debug("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消一个任务并且等待它结束
    Logger.debug("main: Now I can quit.")
    //sampleEnd
}

输出结果:

15:21:11:418 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
15:21:11:856 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
15:21:12:356 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
15:21:12:665 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): main: I'm tired of waiting!
15:21:12:856 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 3 ...
15:21:13:356 [DefaultDispatcher-worker-2 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 4 ...
15:21:13:357 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): main: Now I can quit.

可以看到它连续打印出了“I'm sleeping” ,甚至在调用取消后, 任务仍然执行了五次循环迭代并运行到了它结束为止。
 
使计算代码可取消

我们有两种方法来使执行计算的代码可以被取消。
第一种方法是定期调用挂起函数来检查取消。对于这种目的,使用 yield 是一个好的选择。
另一种方法是显式的检查取消状态。让我们试下第二种方法。

fun main() = runBlocking {
    //sampleStart
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // 可以被取消的计算循环
            // 每秒打印消息两次
            if (System.currentTimeMillis() >= nextPrintTime) {
                Logger.debug("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 等待一段时间
    Logger.debug("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消一个任务并且等待它结束
    Logger.debug("main: Now I can quit.")
    //sampleEnd
}

运行结果

15:34:57:214 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
15:34:57:649 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
15:34:58:149 [DefaultDispatcher-worker-1 @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
15:34:58:460 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:22): main: I'm tired of waiting!
15:34:58:462 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:24): main: Now I can quit.

可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。
 
在 finally 中释放资源

可取消的挂起函数在被取消时会抛出 CancellationException,我们通常使用以下方式处理后续的收尾工作。

 
运行不可取消的代码块

在之前例子中任何尝试在 finally 块中调用挂起函数的代码都会抛出 CancellationException,因为运行此代码的协程被取消了。

所以良好的关闭操作(关闭一个文件、取消一个任务、或是关闭任何一种 通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。

然而,在真实的案例中,当你需要在一个 被取消的协程 中调用挂起函数, 你可以将相应的代码包装在 withContext(NonCancellable) {……} 中。

使用 withContext 函数以及 NonCancellable 上下文,见如下示例所示:

fun main() = runBlocking {
    //sampleStart
    val job = launch {
        try {
            repeat(1000) { i ->
                Logger.debug("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                Logger.debug("job: I'm running finally")
                delay(1000L)  //此处调用挂起函数
                Logger.debug("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // 延迟一段时间
    Logger.debug("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该任务并等待它结束
    Logger.debug("main: Now I can quit.")
    //sampleEnd
}

运行结果:

16:33:36:683 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 0 ...
16:33:37:199 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 1 ...
16:33:37:700 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1.invokeSuspend(ConsoleExceptionDemo.kt:16): job: I'm sleeping 2 ...
16:33:37:931 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:28): main: I'm tired of waiting!
16:33:37:990 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1$2.invokeSuspend(ConsoleExceptionDemo.kt:21): job: I'm running finally
16:33:38:991 [main @coroutine#2] ConsoleExceptionDemoKt$main$1$job$1$2.invokeSuspend(ConsoleExceptionDemo.kt:23): job: And I've just delayed for 1 sec because I'm non-cancellable
16:33:39:001 [main @coroutine#1] ConsoleExceptionDemoKt$main$1.invokeSuspend(ConsoleExceptionDemo.kt:30): main: Now I can quit.

 
超时取消

在大多数情况下,取消协程的执行是因为它执行的时间超过预期的时间了。
虽然,你可以手动获取到协程相应 Job 对象的引用,并启动另外一个协程在延迟一段时间后通过 Job 对象取消那个协程。
然而Kotlin协程库已经为我们准备好 withTimeout 函数来做这件事。

fun main() = runBlocking {
    //sampleStart
    withTimeout(1300L) {
        repeat(1000) { i ->
            Logger.debug("I'm sleeping $i ...")
            delay(500L)
        }
    }
    //sampleEnd
}

运行结果:

17:02:06:826 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 0 ...
17:02:07:343 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 1 ...
17:02:07:844 [main @coroutine#1] ConsoleExceptionDemoKt$main$1$1.invokeSuspend(ConsoleExceptionDemo.kt:15): I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
    at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:128)
    at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:94)
    at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.kt:307)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)
    at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)
    at java.lang.Thread.run(Thread.java:745)

withTimeout 抛出的 TimeoutCancellationExceptionCancellationException 的子类。
我们之前没有看到它的堆栈跟踪打印在控制台上。
这是因为在取消的协程中,CancellationException 被认为是协程完成的正常原因。
但是,在这个例子中,我们在main函数中使用了 withTimeout

 
自定义可取消的挂起函数

delay 可以响应取消操作,看下 delay 函数的实现:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

关键部分是其使用 suspendCancellableCoroutine 这个挂起内联函数给包装了一下。

/**
 * Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
 * the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
 */
public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        // NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
        // method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
        // cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

包括其他的可取消的挂起函数都是使用 suspendCancellableCoroutine 进行了封装。

image

自定义可取消的挂起函数:

使用Retrofit创建一个网络请求接口:

import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
import retrofit2.http.Path

val gitHubServiceApi by lazy {
    val retrofit = retrofit2.Retrofit.Builder()
        .client(OkHttpClient.Builder().build())
        .baseUrl("https://api.github.com")
        .addConverterFactory(GsonConverterFactory.create())
        .build()

    retrofit.create(GitHubServiceApi::class.java)
}

interface GitHubServiceApi {
    @GET("users/{login}")
    fun getUser(@Path("login") login: String): Call<User>
}

data class User(val id: String, val name: String, val url: String)

 
普通调用的代码:

fun main() {
    val callback = object : Callback<User> {
        override fun onFailure(call: Call<User>, t: Throwable) {
            Logger.debug("onFailure: $t")
        }

        override fun onResponse(call: Call<User>, response: Response<User>) {
            Logger.debug("onResponse: ${response.code()}")
            if (response.isSuccessful) {
                Logger.debug(response.body())
            }else{
                Logger.error(HttpException(response))
            }
        }

    }
    gitHubServiceApi.getUser("JakeWharton").enqueue(callback)
    Thread.sleep(3000)
}

运行结果:

15:28:33:274 [OkHttp https://api.github.com/...] GitHubServiceKt$main$1.onResponse(GitHubService.kt:36): onResponse: 200
15:28:33:292 [OkHttp https://api.github.com/...] GitHubServiceKt$main$1.onResponse(GitHubService.kt:38): User(id=66577, name=Jake Wharton, url=https://api.github.com/users/JakeWharton)

 
转换成可取消的挂起函数:

suspend fun getUser(name: String) = suspendCancellableCoroutine<User> { continuation ->
    val call = gitHubServiceApi.getUser(name)

    continuation.invokeOnCancellation {
        Logger.debug("invokeOnCancellation: cancel the request.")
        call.cancel()
    }

    call.enqueue(object : Callback<User> {
        override fun onFailure(call: Call<User>, t: Throwable) {
            Logger.debug("onFailure: $t")
            continuation.resumeWithException(t)
        }

        override fun onResponse(call: Call<User>, response: Response<User>) {
            Logger.debug("onResponse: ${response.code()}")
            if (response.isSuccessful) {
                response.body()?.let { continuation.resume(it) } ?: continuation.resumeWithException(
                    NullPointerException("User is Null")
                )
            } else {
                continuation.resumeWithException(HttpException(response))
            }
        }
    })
}

 
在协程中调用并且取消:

suspend fun main() {
    val job = GlobalScope.launch {
        Logger.debug(1)
        val user = getUser("JakeWharton")
        Logger.debug(user)
    }
    Logger.debug(2)
    job.cancelAndJoin()
    Logger.debug(3)
}

运行结果:

15:53:07:886 [main] ConsoleCancellableKt.main(ConsoleCancellable.kt:15): 2
15:53:07:886 [DefaultDispatcher-worker-1] ConsoleCancellableKt$main$job$1.invokeSuspend(ConsoleCancellable.kt:11): 1
15:53:08:450 [DefaultDispatcher-worker-1] GetUserKt$getUser$2$1.invoke(GetUser.kt:18): invokeOnCancellation: cancel the request.
15:53:08:453 [DefaultDispatcher-worker-1] ConsoleCancellableKt.main(ConsoleCancellable.kt:17): 3
15:53:08:454 [OkHttp https://api.github.com/...] GetUserKt$getUser$2$2.onFailure(GetUser.kt:24): onFailure: java.io.IOException: Canceled

从日志中看到,取消的回调被调用了,OkHttp 也确实停止了网络请求,并且回调给我们一个 IO 异常,这时候我们的协程已经被取消。

在处于 取消状态的协程 上调用 Continuation.resumeContinuation.resumeWithException 或者 Continuation.resumeWith 都会被忽略。
因此 OkHttp 回调中我们收到 IO 异常后调用的 continuation.resumeWithException(e) 不会有任何副作用。
 
Retrofit 2.6.0中使用了协程,也是用了同样的方式。

image
suspend fun launchForEach() {
    coroutineScope {
        launch {
            listOf("JakeWharton", "abreslav", "yole", "elizarov")
                .forEach {
                    try {
                        val user = gitHubServiceApi.getUser(it).await()
                        Logger.debug(user)
                    } catch (e: Exception) {
                        Logger.error(e)
                    }
                    delay(1000)
                }
        }.join()
    }
}
14:07:31:568 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:31:587 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=66577, name=Jake Wharton, url=https://api.github.com/users/JakeWharton)
14:07:32:931 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:32:933 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=888318, name=Andrey Breslav, url=https://api.github.com/users/abreslav)
14:07:34:285 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:34:287 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=46553, name=Dmitry Jemerov, url=https://api.github.com/users/yole)
14:07:35:626 [OkHttp https://api.github.com/...] GitHubServiceKt$gitHubServiceApi$2$retrofit$1.intercept(GitHubService.kt:19): request: 200
14:07:35:636 [main] Demo0Kt$launchForEach$2$1.invokeSuspend(Demo0.kt:16): User(id=478679, name=Roman Elizarov, url=https://api.github.com/users/elizarov)
Android中使用协程

引入协程库

dependencies {
  implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.2.2'
}

kotlinx-coroutines-android 依赖的库:

image
kotlinx-coroutines-android 这个库里面的代码很少,协程的大部分实现都在 kotlinx-coroutines-core 这个库里。

 
任务调度器

kotlinx-coroutines-android 提供了 Dispatchers.Main 调度器 在Android平台上的实现 HandlerDispatcher

image

可以从源码上看到,任务的调度都是通过 Handler 来实现:

image
 
UI 生命周期作用域

在Android中, 我们想让发出去的请求能够在当前 UI 或者 Activity 退出或者销毁的时候能够自动取消,就要与其生命周期绑定。
协程有一个很天然的特性能刚够支持这一点,那就是作用域。官方也提供了 MainScope 这个函数。

/**
 * Creates the main [CoroutineScope] for UI components.
 *
 * Example of use:
 * ```
 * class MyAndroidActivity {
 *   private val scope = MainScope()
 *
 *   override fun onDestroy() {
 *     super.onDestroy()
 *     scope.cancel()
 *   }
 * }
 *
 * ```
 *
 * The resulting scope has [SupervisorJob] and [Dispatchers.Main] context elements.
 * If you want to append additional elements to the main scope, use [CoroutineScope.plus] operator:
 * `val scope = MainScope() + CoroutineName("MyActivity")`.
 */
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

MainScope 的源码可以看到,MainScope 是由 SupervisorJob()Dispatchers.Main 共同完成。
使用 SupervisorJob 保证子任务失败不会导致父任务被取消;父任务能够取消所有的子任务。
使用 Dispatchers.Main 让协程体运行在主线程中。因此作用域内除非明确声明调度器,协程体都调度在主线程执行。

 
示例:

class MainActivity : AppCompatActivity() {
    private val mainScope = MainScope()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        button.setOnClickListener {
            //启动一个协程
            mainScope.launch {
                Logger.debug(1)
                textView.text = withContext(Dispatchers.IO) { //任务调度在IO线程池
                    Logger.debug(2)
                    delay(3000) //模拟耗时操作
                    Logger.debug(3)
                    "Hello Coroutines"
                }
                Logger.debug(4)
            }
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        mainScope.cancel() //Activity退出时取消协程
    }
}

运行结果:

D/Coroutine: [main] MainActivity$onCreate$1$1.invokeSuspend(MainActivity.kt:19): 1
D/Coroutine: [DefaultDispatcher-worker-1] MainActivity$onCreate$1$1$1.invokeSuspend(MainActivity.kt:21): 2
D/Coroutine: [DefaultDispatcher-worker-2] MainActivity$onCreate$1$1$1.invokeSuspend(MainActivity.kt:23): 3
D/Coroutine: [main] MainActivity$onCreate$1$1.invokeSuspend(MainActivity.kt:26): 4

 
带有作用域的抽象Activity

尽管我们在上面直接使用 MainScope 可以很方便的控制其作用域范围内的协程的取消,
以及能够无缝将异步任务切回主线程,这都是我们想要的特性,不过写法上还是不够美观。

官方推荐我们定义一个抽象的 Activity,例如:

abstract class ScopedActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onDestroy() {
        super.onDestroy()
        cancel()
    }
}

这样在 Activity 退出的时候,对应的作用域就会被取消,所有在该 Activity 中发起的请求都会被取消掉。
使用时,只需要继承这个抽象类即可:

class CoroutineActivity : ScopedActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        button.setOnClickListener {
            //启动一个协程
            launch {
                Logger.debug(1)
                textView.text = withContext(Dispatchers.IO) { //任务调度在IO线程池
                    Logger.debug(2)
                    delay(3000) //模拟耗时操作
                    Logger.debug(3)
                    "Hello Coroutines"
                }
                Logger.debug(4)
            }
        }
    }
}

除了在当前 Activity 内部获得 MainScope 的能力外,还可以将这个 作用域 实例传递给其他需要的模块。

例如 Presenter 通常也需要与 Activity保持同样的生命周期,因此必要时也可以将该作用域传递过去:

class CoroutineActivity : ScopedActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        button.setOnClickListener {
            //启动一个协程
            launch {
                Logger.debug(1)
                textView.text = withContext(Dispatchers.IO) {
                    //任务调度在IO线程池
                    Logger.debug(2)
                    delay(3000) //模拟耗时操作
                    Logger.debug(3)
                    "Hello Coroutines"
                }
                Logger.debug(4)
            }
        }
        next.text = "Get User Data"
        next.setOnClickListener {
            CoroutinePresenter(this@CoroutineActivity).getUserData(textView)
        }
    }
}

class CoroutinePresenter(private val scope: CoroutineScope) : CoroutineScope by scope {

    fun getUserData(textView: TextView) {
        Logger.debug(1)
        launch {
            delay(2000)
            Logger.debug(3)
            textView.text = "Kotlin Coroutines"
        }
        Logger.debug(2)
    }
}

多数情况下,Presenter 的方法也会被 Activity 直接调用,因此也可以将 Presenter 的方法生命成 suspend 方法,
然后用 coroutineScope 嵌套作用域,这样 MainScope 被取消后,嵌套的子作用域一样也会被取消,进而达到取消全部子协程的目的:

next.setOnLongClickListener {
    launch {
        val data =  CoroutinePresenter2().getUserData()
        Logger.debug("set User data")
        textView.text = data
        textView.setTextColor(Color.RED)
    }
    false
}
class CoroutinePresenter2 {

    suspend fun getUserData() = coroutineScope{
        Logger.debug("start get user data")
        delay(1000)
        val deferred = async(Dispatchers.IO) {
            delay(2000)
            Logger.debug("return user data")
            "User Data is Empty"
        }
        deferred.await()
    }
}

输出log:

D/Coroutine: [main] CoroutinePresenter2$getUserData$2.invokeSuspend(CoroutinePresenter2.kt:9): start get user data
D/Coroutine: [DefaultDispatcher-worker-1] CoroutinePresenter2$getUserData$2$deferred$1.invokeSuspend(CoroutinePresenter2.kt:13): return user data
D/Coroutine: [main] CoroutineActivity$onCreate$3$1.invokeSuspend(CoroutineActivity.kt:42): set User data

 
总结:

在 Android 上使用协程,更多的就是简化异步逻辑,把异步的代码改成同步的写法。
协程为我们提供了 Dispatchers.Main 调度器,让我们的UI 逻辑在 UI 线程中处理。
如果涉及到一些 io 操作,使用 async 将其调度到 Dispatchers.IO 上,结果返回时协程会自动帮我们切回到主线程。
对于一些 UI 不相关的逻辑,通常使用 Dispatchers.Default 就足够使用了。

上一篇下一篇

猜你喜欢

热点阅读