Kotlin技术知识Kotlin编程Kotlin专题

开始使用Kotlin协程

2017-06-25  本文已影响2256人  登高而望远

本文主要介绍协程的用法, 以及使用协程能带来什么好处. 另外, 也会粗略提一下协程的大致原理.
本文的意义可能仅仅是让你了解一下协程, 并愿意开始使用它.
如果想彻底理解协程, 请查看官方文档, 官方文档链接将在文章的结尾给出.

如果你以前在别的语言里学习过协程, 如Python的yield, 那请你先忘记它们, 毕竟还是有些区别, 等你弄懂了Kotlin的协程, 再去作对比, 否则, 可能会有一些先入为主的思路来阻碍你理解, 我就吃过这个亏.

初识协程:

首先我们来瞄一眼协程是长啥样的, 以下引用(copy)了官网的一个例子:

fun main(args: Array<String>) {
    launch(CommonPool) {
        delay(1000L) 
        println("World!") 
    }
    println("Hello,")
    Thread.sleep(2000L)
}

/* 
运行结果: ("Hello,"会立即被打印, 1000毫秒之后, "World!"会被打印)
Hello, 
World!
*/

姑且不管里面具体的细节, 上面代码大体的运行流程是这样的:

A. 主流程:

  1. 调用系统的launch方法启动了一个协程, 跟随的大括号可以看做是协程体.
    (其中的CommonPool暂且理解成线程池, 指定了协程在哪里运行)
  2. 打印出"Hello,"
  3. 主线程sleep两秒
    (这里的sleep只是保持进程存活, 目的是为了等待协程执行完)

B. 协程流程:

  1. 协程延时1秒
  2. 打印出"World!"

解释一下delay方法:
在协程里delay方法作用等同于线程里的sleep, 都是休息一段时间, 但不同的是delay不会阻塞当前线程, 而像是设置了一个闹钟, 在闹钟未响之前, 运行该协程的线程可以被安排做了别的事情, 当闹钟响起时, 协程就会恢复运行.

协程启动后还可以取消
launch方法有一个返回值, 类型是Job, Job有一个cancel方法, 调用cancel方法可以取消协程, 看一个数羊的例子:

fun main(args: Array<String>) {
    val job = launch(CommonPool) {
        var i = 1
        while(true) {
            println("$i little sheep")
            ++i
            delay(500L)  // 每半秒数一只, 一秒可以输两只
        }
    }

    Thread.sleep(1000L)  // 在主线程睡眠期间, 协程里已经数了两只羊
    job.cancel()  // 协程才数了两只羊, 就被取消了
    Thread.sleep(1000L)
    println("main process finished.")
}

运行结果是:

1 little sheep
2 little sheep
main process finished.

如果不调用cancel, 可以数到4只羊.

协程的核心是suspend方法, 下面先讲解一下suspend方法, 之后再继续别的话题.

理解suspend方法:

suspend方法是协程的核心, 理解suspend方法是使用和理解协程的关键.
(suspend lambda和suspend方法差不多, 只是没有名字, 不再单独介绍了)

suspend方法的语法很简单, 只是比普通方法只是多了个suspend关键字:

suspend fun foo(): ReturnType {
    // ...
}

suspend方法只能在协程里面调用, 不能在协程外面调用.
suspend方法本质上, 与普通方法有较大的区别, suspend方法的本质是异步返回(注意: 不是异步回调). 后面我们会解释这句话的含义.

现在, 我们先来看一个异步回调的例子:

fun main(...) {
  requestDataAsync {
    println("data is $it")
  }
  Thead.sleep(10000L)  // 这个sleep只是为了保活进程
}

fun requestDataAsync(callback: (String)->Unit) {
    Thread() {
        // do something need lots of times.
        // ...
        callback(data)
    }.start()
}

逻辑很简单, 就是通过异步的方法拉一个数据, 然后使用这个数据, 按照以往的编程方式, 若要接受异步回来的数据, 唯有使用callback.
但是假如使用协程, 可以不使用callback, 而是直接把这个数据"return"回来, 调用者不使用callback接受数据, 而是像调用同步方法一样接受返回值. 如果上述功能改用协程, 将会是:

fun main(...) {
    launch(Unconfined) {  // 请重点关注协程里是如何获取异步数据的
        val data = requestDataAsync()  // 异步回来的数据, 像同步一样return了
        println("data is $it")
    }

    Thead.sleep(10000L) // 请不要关注这个sleep
}

suspend fun requestDataAsync() { // 请注意方法前多了一个suspend关键字
    return async(CommonPool) { // 先不要管这个async方法, 后面解释
        // do something need lots of times.
        // ...
        data  // return data, lambda里的return要省略
    }.await()
}

这里, 我们首先将requestDataAsync转成了一个suspend方法, 其原型的变化是:

  1. 在前加了个suspend关键字.
  2. 去除了原来的callback参数.

这里先不去深究这个方法的新实现, 后面会专门解释.
这里需要关注的点是: 在协程里面, 调用suspend方法, 异步的数据像同步一样般return了.
这是怎么做到的呢?
当程序执行到requestDataAsync内部时, 通过async启动了另外一个新的子协程去拉取数据, 启动这个新的子协程后, 当前的父协程就挂起了, 此时requestDataAsync还没有返回.
子协程一直在后台跑, 过了一段时间, 子协程把数据拉回来之后, 会恢复它的父协程, 父协程继续执行, requestDataAsync就把数据返回了.

为了加深理解, 我们来对比一下另一个例子: 不使用协程, 将异步方法也可以转成同步的方法(在单元测试里, 我们经常这么做):

fun main(...) {
    val data = async2Sync()  // 数据是同步返回了, 但是线程也阻塞了
    println("data is $it")
    // Thead.sleep(10000L)  // 这一句在这里毫无意义了, 注释掉
}

private var data = ""
private fun async2Sync(): String {
    val obj = Object() // 随便创建一个对象当成锁使用
    requestDataAsync { data ->
        this.data = data  // 暂存data
        synchronized(locker) {
            obj.notifyAll() // 通知所有的等待者
        }
    }
    obj.wait() // 阻塞等待
    return this.data
}

fun requestDataAsync(callback: (String)->Unit) {
    // ...普通的异步方法
}

注意对比上一个协程的例子, 这样做表面上跟它是一样的, 但是这里main方法会阻塞的等待async2Sync()方法完成. 同样是等待, 协程就不会阻塞当前线程, 而是自己主动放弃执行权, 相当于遣散当前线程, 让它去干别的事情去.

为了更好的理解这个"遣散"的含义, 我们再来看一个例子:

fun main(args: Array<String>) {
    // 1. 程序开始
    println("${Thread.currentThread().name}: 1");  

    // 2. 启动一个协程, 并立即启动
    launch(Unconfined) { // Unconfined意思是在当前线程(主线程)运行协程
        // 3. 本协程在主线程上直接开始执行了第一步
        println("${Thread.currentThread().name}: 2");  

        /* 4. 本协程的第二步调用了一个suspend方法, 调用之后, 
         * 本协程就放弃执行权, 遣散运行我的线程(主线程)请干别的去.
         * 
         * delay被调用的时候, 在内部创建了一个计时器, 并设了个callback.
         * 1秒后计时器到期, 就会调用刚设置的callback.
         * 在callback里面, 会调用系统的接口来恢复协程. 
         * 协程在计时器线程上恢复执行了. (不是主线程, 跟Unconfined有关)
         */
        delay(1000L)  // 过1秒后, 计时器线程会resume协程

        // 7. 计时器线程恢复了协程, 
        println("${Thread.currentThread().name}: 4")
    }

    // 5. 刚那个的协程不要我(主线程)干活了, 所以我继续之前的执行
    println("${Thread.currentThread().name}: 3");

    // 6. 我(主线程)睡2秒钟
    Thread.sleep(2000L)

    // 8. 我(主线程)睡完后继续执行
    println("${Thread.currentThread().name}: 5");
}

运行结果:

main: 1
main: 2
main: 3
kotlinx.coroutines.ScheduledExecutor: 4
main: 5

上述代码的注释详细的列出了程序运行流程, 看完之后, 应该就能明白 "遣散" 和 "放弃执行权" 的含义了.

Unconfined的含义是不给协程指定运行的线程, 逮到谁就使用谁, 启动它的线程直接执行它, 但被挂起后, 会由恢复它的线程继续执行, 如果一个协程会被挂起多次, 那么每次被恢复后, 都可能被不同线程继续执行.

现在再来回顾刚刚那句: suspend方法的本质就是异步返回.
含义就是将其拆成 "异步" + "返回":

调用suspend方法的详细流程是:
在协程里, 如果调用了一个suspend方法, 协程就会挂起, 释放自己的执行权, 但在协程挂起之前, suspend方法内部一般会启动了另一个线程或协程, 我们暂且称之为"分支执行流"吧, 它的目的是运算得到一个数据.
当suspend方法里的*分支执行流"完成后, 就会调用系统API重新恢复协程的执行, 同时会数据返回给协程(如果有的话).

__为什么不能再协程外面调用suspend方法? __
suspend方法只能在协程里面调用, 原因是只有在协程里, 才能遣散当前线程, 在协程外面, 不允许遣散, 反过来思考, 假如在协程外面也能遣散线程, 会怎么样, 写一个反例:

fun main(args: Array<String>) {
    requestDataSuspend(); 
    doSomethingNormal();
}
suspend fun requestDataSuspend() { 
    // ... 
}
fun doSomethingNormal() {
    // ...
}

requestDataSuspend是suspend方法, doSomethingNormal是正常方法, doSomethingNormal必须等到requestDataSuspend执行完才会开始, 后果main方法失去了并行的能力, 所有地方都失去了并行的能力, 这肯定不是我们要的, 所以需要约定只能在协程里才可以遣散线程, 放弃执行权, 于是suspend方法只能在协程里面调用.

概念解释: Continuation 与 suspension point

----个人建议专有名词别翻译成中文, 否则很容易因为断句错误而产生误解

协程的执行其实是断断续续的: 执行一段, 挂起来, 再执行一段, 再挂起来, ...
每个挂起的地方是一个suspension point, 每一小段执行是一个Continuation.
协程的执行流被它的 "suspension point" 分割成了很多个 "Continuation" .
我们可以用一条画了很多点的线段来表示:

协程的执行流分段

其中的<u>Continuation 0</u>比较特殊, 是从起点开始, 到第一个suspension point结束, 由于它的特殊性, 又被称为Initial Continuation.

协程创建后, 并不总是立即执行, 要分是怎么创建的协程, 通过launch方法的第二个参数是一个枚举类型CoroutineStart, 如果不填, 默认值是DEFAULT, 那么久协程创建后立即启动, 如果传入LAZY, 创建后就不会立即启动, 直到调用Job的start方法才会启动.

suspension point只是一个概念, 而Continuation在Kotlin里有一个对应interface, 关于这个interface后面再介绍.

封装异步回调方法

在没有协程的世界里, 通常异步的方法都需要接受一个callback用于发布运算结果.
在协程里, 所有接受callback的方法, 都可以转成不需要callback的suspend方法.

上面的requestDataSuspend方法就是一个这样的例子, 我们回过头来再看一眼:

suspend fun requestDataSuspend() {
    return async(CommonPool) {
        // do something need lots of times.
        // ...
        data  // return data
    }.await()
}

其内部通过调用了async和await方法来实现(关于async和await我们后面再介绍), 这样虽然实现功能没问题, 但并不最合适的方式, 上面那样做只是为了追求最简短的实现, 合理的实现应该是调用suspendCoroutine方法, 大概是这样:

suspend fun requestDataSuspend() {
    suspendCoroutine { cont ->
        // ... 细节暂时省略
    }
}
// 可简写成:
suspend fun requestDataSuspend() = suspendCoroutine { cont ->
    // ...
}

在完整实现之前, 需要先理解suspendCoroutine方法, 它是Kotlin标准库里的一个方法, 原型如下:

suspend fun <T> suspendCoroutine(block: (Continuation<T>) -> Unit): T

后面我们讨论Kotlin协程官方API的时候就会知道, 这是Kotlin标准库里用于支持协程的底层API非常少(大多数API不在标准库, 而是在应用层的扩展库, 如上面的launch方法), 这是其中一个.
suspendCoroutine的作用就是将当前执行流挂起, 在适合的时机再将协程恢复执行, 我们可以看到他的参数是一个lambda, lambda的参数是一个Continuation, 我们刚刚其实已经提到过Continuation了, 它表示一段执行流, 这里就不做过多解释了, 这个方法里的<u>Continuation实例</u>代表的执行流是从当前的suspension point开始, 到下一个suspension point结束, 当前的suspension point就是调用suspendCoroutine这一刻.
调用suspendCoroutine之后, 当前的执行流会挂起(调用suspendCoroutine的线程会遣散, 但不是整个进程都挂起, 不然谁做事呢), 然后开另一个执行流去做异步的事情, 等到异步的事情做完, 当前的执行流又会恢复, 下面看一下是如何恢复的.
suspendCoroutine的会自动捕获当前的执行环境(如临时变量, 参数等), 然后存放到一个Continuation中, 并且作为参数传给它的lambda.
之前已经提到Continuation是标准库里的一个interface, 它的原型是:

interface Continuation<in T> {
   val context: CoroutineContext // 暂时不管这个
   fun resume(value: T)
   fun resumeWithException(exception: Throwable)
}

它有两个方法resumeresumeWithException:

现在来完善一下刚刚的例子:

suspend fun requestDataSuspend() = suspendCoroutine { cont ->
    requestDataFromServer { data -> // 普通方法还是通过callback接受数据
        if (data != null) {
            cont.resume(data)
        } else {
            cont.resumeWithException(MyException())
        }
    }
}

/** 普通的异步回调方法 */
fun requestDataFromServer(callback: (String)->Unit) {
    // ... get data from server, it will call back when finished.
}

逻辑很简单, 如果data有效就正常恢复, 否则异常恢复.
但这里需要注意的是: 传给resume的参数会变成suspendCoroutine的返回值, 进而成为了requestDataSuspend方法的返回值.
这个地方太神奇了, Kotlin是如何做到的呢?, 估计短时间也难以理解, 先记住吧.
suspendCoroutine有个特点:

suspendCoroutine { cont ->
    // 如果本lambda里返回前, cont的resume和resumeWithException都没有调用
    // 那么当前执行流就会挂起, 并且挂起的时机是在suspendCoroutine之前
    // 就是在suspendCoroutine内部return之前就挂起了
    
    // 如果本lambda里返回前, 调用了cont的resume或resumeWithException
    // 那么当前执行流不会挂起, suspendCoroutine直接返回了, 
    // 若调用的是resume, suspendCoroutine就会像普通方法一样返回一个值
    // 若调用的是resumeWithException, suspendCoroutine会抛出一个异常
    // 外面可以通过try-catch来捕获这个异常
}

回过头来看一下, 刚刚的实现有调用resume方法吗, 我们把它折叠一下:

suspend fun requestDataSuspend() = suspendCoroutine { cont ->
    requestDataFromServer { ... }
}

清晰了吧, 没有调用, 所以suspendCoroutine还没有返回之前就挂起了, 但是挂起之前lambda执行完了, lambda里调用了requestDataFromServer, requestDataFromServer里启动了真正做事的流程(异步执行的), 而suspendCoroutine则在挂起等待.
等到requestDataFromServer完成工作, 就会调用传入的callback, 而这个callback里调用了cont.resume(data), 外层的协程就恢复了, 随后suspendCoroutine就会返回, 返回值就是data.

大家一定很好奇Kotlin内部是如何实现的, 要像彻底了解其中的奥妙, 还是要看官方文档和代码, 这里只简单介绍一下大致原理, 太细的我也不懂, 大家凑合着看一下, 看不懂也没关系.
在Kotlin内部, 协程被实现成了一个状态机, 状态的个数就是suspension point的个数+1(初始状态), 当前的状态就是当前的suspension point, 当调用resume时, 就会执行下一个Continuation.

估计大家这个时候应该是似懂非懂, 其实作为使用者, 这已经够了, 但是要深入研究, 还是靠自己研究代码.

async/await模式:

我们前面多次使用了launch方法, 它的作用是创建协程并立即启动, 但是有一个问题, 就是通过launch方法创建的协程都没办法携带返回值. async之前也出现过, 但一直没有详细介绍.

async方法作用和launch方法基本一样, 创建一个协程并立即启动, 但是async创建的协程可以携带返回值.
launch方法的返回值类型是Job, async方法的返回值类型是Deferred, 是Job的子类, Deferred里有个await方法, 调用它可得到协程的返回值.

async/await是一种常用的模式, async的含义是启动一个异步操作, await的含义是等待这个异步操作结果.
是谁要等它啊, 在传统的不使用协程的代码里, 是线程在等(线程不干别的事, 就在那里傻等). 在协程里不是线程在等, 而且是执行流在等, 当前的流程挂起(底下的线程会被遣散去干别的事), 等到有了运算结果, 流程才继续运行.
所以我们又可以顺便得出一个结论: 在协程里执行流是线性的, 其中的步骤无论是同步的还是异步的, 后面的步骤都会等前面的步骤完成.
我们可以通过async起多个任务, 他们会同时运行, 我们之前使用的async姿势不是很正常, 下面看一下使用async正常的姿势:

fun main(...) {
    launch(Unconfined) {
        // 任务1会立即启动, 并且会在别的线程上并行执行
        val deferred1 = async { requestDataAsync1() }
        
        // 上一个步骤只是启动了任务1, 并不会挂起当前协程
        // 所以任务2也会立即启动, 也会在别的线程上并行执行
        val deferred2 = async { requestDataAsync2() }

        // 先等待任务1结束(等了约1000ms), 
        // 然后等待任务2, 由于它和任务1几乎同时启动的, 所以也很快完成了
        println("data1=$deferred2.await(), data2=$deferred2.await()")
    }

    Thead.sleep(10000L) // 继续无视这个sleep
}

suspend fun requestDataAsync1(): String {
    delay(1000L)
    return "data1"    
}
suspend fun requestDataAsync2(): String {
    delay(1000L)
    return "data2"    
}

运行结果很简单, 不用说了, 但是协程总耗时是多少呢, 约1000ms, 不是2000ms, 因为两个任务是并行运行的.
有一个问题: 假如任务2先于任务1完成, 结果是怎样的呢?
答案是: 任务2的结果会先保存在deferred2里, 当调用deferred2.await()时, 会立即返回, 不会引起协程挂起, 因为deferred2已经准备好了.
所以, suspend方法并不总是引起协程挂起, 只有其内部的数据未准备好时才会.

需要注意的是: await是suspend方法, 但async不是, 所以它才可以在协程外面调用, async只是启动了协程, async本身不会引起协程挂起, 传给async的lambda(也就是协程体)才可能引起协程挂起.

async/await模式在别的语言里, 被实现成了两个关键字, 但在Kotlin里只是两个很平常的方法.

Generators介绍:

学习Python的协程的时候, 最先学习的就是Generators, 它的作用就是通过计算产生序列, 而不用通过列表之类存储机制. 以下通过Generators产生斐波那契序列:

// inferred type is Sequence<Int>
val fibonacci = buildSequence {
    yield(1) // first Fibonacci number
    var cur = 1
    var next = 1
    while (true) {
        yield(next) // next Fibonacci number
        val tmp = cur + next
        cur = next
        next = tmp
    }
}

fun main(...) {
    launch(Unconfined) {  // 请重点关注协程里是如何获取异步数据的
        fibonacci.take(10).forEach { print("$it, ") }
    }

    Thead.sleep(10000L) // 请不要关注这个sleep
}

// will print 1, 1, 2, 3, 5, 8, 13, 21, 34, 55,

我觉得这个没什么好解释的, yield是一个suspend方法, 放弃执行权, 并将数据返回.
根据前面的知识, 我们可以推断出, yield内部肯定最终会调用到Continuation的resume方法.

yield在别的语言, 一般是一个关键字, Kotlin中也是一个方法.

yield是标准库里的API, 大多数情况我们不需要直接调用这个方法, 使用kotlinx.coroutines里面的Channelproduce方法更加方法. 具体可以参考这里:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#building-channel-producers
序列的产生跟RX其实有点像, 但也是区别的, 具体可以参考这里:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/coroutines-guide-reactive.md

目前没有发现特别需要使用Generators的场景, 所以这里不做太多讨论.

协程API说明:

Kotlin的开发者对协程的实现比较独特, 语言机制本身只增加了极少的关键字, 标准库也只有极少的API, 但这并不代表功能少, 根据Kotlin的设计, 很多功能型API都放到了更上层的应用库里去实现.
只将少量核心的机制才放到语言本身和标准库上. 这样做不仅使得语言更简单, 而且灵活性更强.

Kotlin官方对协程提供的三种级别的能力支持, 分别是: 最底层的语言层, 中间层标准库(kotlin-stdlib), 以及最上层应用层(kotlinx.coroutines).

应用层:
这一层是我们的程序直接调用的层, 提供一些常用的实现方法, 如launch方法, async方法等, 它的实现在kotlinx.coroutines里面.

标准库:
标准库仅仅提供了少量创建协程的方法, 位于:
kotlin.coroutines.experimental:
-- createCoroutine()
-- startCoroutine()
-- suspendCoroutine()

到目前为止, 我们直接使用到的只有suspendCoroutine方法.
launch和async方法的实现里最终调用了startCoroutine方法.
Generators里的buildSequence方法, 最终会调用buildSequence来实现.

语言层:
语言本身主要提供了对suspend关键字的支持, Kotlin编译器会对suspend修饰的方法或lambda特殊处理, 生成一些中间类和逻辑代码.

我们平常用到的基本都是应用层的接口, 应用层提供了很多非常核心功能, 这些功能在其他语言里大多是通过关键字来实现的, 而在Kotlin里, 这些都是实现成了方法.

总结


协程是什么:

看了这么多例子, 我们现在可以总结一下协程是什么, 协程到底是什么, 很难给出具体的定义, 就算能给出具体定义, 也会非常抽象难以理解的.
另一方面, 协程可以说是编译器的能力, 因为协程并不需要操作系统和硬件的支持(线程需要), 是编译器为了让开发者写代码更简单方便, 提供了一些关键字, 并在内部自动生成了一些支持型代码(可能是字节码).

以下我个人的总结:

协程和线程区别与关系:

线程和协程的目的本质上存在差异:

在调度上, 协程跟线程也不同:

作用上的不同:

协程与线程的关系:
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程. 可总结如下:

  1. 线程是协程的资源.
  2. 协程通过Interceptor来间接使用线程这个资源.

结语:

如果需要经常使用协程, 建议抽时间看一下官方文档.
最后, 感谢大家的阅读, 希望本文对你有所帮助 !


官方英文文档连接:

  1. 官方的协程简介
  2. 完整版的使用指南
  3. 当前的实现方案详解

_
第1个页面是是官方指南的子页面, 第2个和第3个分别是两个GitHub项目里面的markdown文档, 他们所在的工程还包含其他文档, 有需要可以自行浏览.
另外, 提个建议: 如果看着看着卡壳了, 可以跳过或查阅另外几个文档, 以后再回过来看, 别的文档有可能会用别的方式或别的例子来描述了同一个东西)
_

上一篇 下一篇

猜你喜欢

热点阅读