协程

Kotlin - 协程 简介

2020-08-14  本文已影响0人  Whyn

[TOC]

Mind Map

简介

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
协程(英语:coroutine)是计算机程序的一类组件,推广了协作式多任务的子程序,允许执行被挂起与被恢复。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。协程更适合于用来实现彼此熟悉的程序组件,如协作式多任务、异常处理、事件循环、迭代器、无限列表和管道。

以上是维基百科协程的定义。

简单来讲,协程是一种轻量级线程。

相对于线程切换是由操作系统进行调度的,程序员无法进行控制。
而协程的调度是由程序员在代码层面上进行控制的,程序员可以通过控制suspend函数的挂起和恢复,从而控制程序运行流程,这在代码的展示上,相当于用同步的代码书写异步程序,代码逻辑非常简洁易懂。

理论上,由于协程不涉及到操作系统调度,因此只是在用户态上进行操作,而线程需要经历用户态与内核态之间的切换,所以协程性能更佳。
但是不同的语言在实现上可能存在差异,比如本文所要介绍的 Kotlin 下的协程,其在 JVM 平台上的内部实现也是基于线程,因此如果其进行了协程调度,存在一定的可能是进行了线程切换。

协程经常拿来与线程进行对比,它们彼此很相似,但是也很不同。
可以简单理解如下:

由于一个线程可以包含多个协程,而协程具备挂起和恢复功能,也因此让我们具备了在一个线程上执行多个异步任务的能力。

:还是如上文所言,Kotlin 协程的底层实现存在线程切换,因此异步任务可能执行在另一条线程上。

名词释义

在具体介绍协程之前,需要先了解一下以下几个概念:

同步与异步是针对返回结果来说的,
对于同步调用,由调用者主动获取结果。
对于异步调用,调用者是通过回调等方式被动获取结果的。

简单理解,比如对于一个函数调用,
同步调用就是调用函数后,直接就可以获取结果。
异步调用就是调用函数后,不关心结果,等函数体内的任务结束时,通过回调等方式通知调用者结果。

阻塞和非阻塞是针对当前线程是否具备 CPU 执行权来说的,
对于阻塞调用,调用不立即返回,当前线程被挂起,失去 CPU 执行权,直至调用任务完成,返回结果。
对于非阻塞调用,调用立即返回,当前线程仍然拥有 CPU 执行权,可继续执行后续代码。

:可以讲上述描述中的 任务 理解为 函数,更加直观。

协程涉及到的一些概念

基本使用

下面用代码来创建一个最简单的协程,了解一下基本的使用。具体步骤如下:

协程作用域

每个协程都拥有自己的作用域范围,kotlinx.coroutines 提供了多种协程作用域,以下介绍常见的几种:

创建协程

kotlinx.coroutines 提供了多种协程构造器让我们创建协程。以下列举一些常见的协程创建方式:

协程上下文与调度器(Coroutine Context and Dispatchers)

协程上下文是一系列不同元素的集合。其中主要元素是协程的Job

协程会一直运行在某个协程上下文CoroutineContext中。

launch/async等协程构造器未显示指定CoroutineContext时,它会继承父协程的上下文,且异步任务会运行在父协程上下文指定的线程环境中。

GlobalScope.launch默认使用的调度器为Dispatchers.Default

:为了更好地查看调度器调度结果,可以在执行代码时,为 JVM 选项添加开启协程调式模式:-Dkotlinx.coroutines.debug,然后使用如下代码就可以在运行时打印出协程实例和其运行所在的线程:

fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")

为了更方便查看协程,在开启协程调式模式后,也可以通过CoroutineName为新创建的协程添加名称,如下所示:

suspend fun main() {
    coroutineScope {
        log("set name for a coroutine")
        launch(CoroutineName("son-routine")) {
            log("this coroutine name is son-routine")
        }
    }
}

其运行结果如下图所示:


:协程上下文CoroutineContext覆写了plus操作符,因此我们可以通过+号连接多个协程上下文元素,如下所示:

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

取消机制

kotlinx.coroutines 提供的所有的suspend函数都是可以进行 取消(cancellable) 的。他们会检测协程取消状态,当检测到取消操作时,就会抛出CancellationException异常,我们可在代码块中对该异常进行捕获。

如下所示:

suspend fun main() = coroutineScope {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L) // delay will detect cancellation for coroutine
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion
    println("main: Now I can quit.")
}

上述代码运行结果如下:

:取消操作能成功的前提是:协程内部会对取消状态进行检测

因此,并不是说我们调用cancel后,就一定能取消协程运行,比如,对于协程内部进行 CPU 密集型计算的操作,就无法进行取消,如下所示:

suspend fun outputTwicePerSecond(){
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}

suspend fun main() = coroutineScope {
    val job = launch(Dispatchers.Default) {
        outputTwicePerSecond()
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

上述代码运行结果如下:

可以看到,在我们调用了cancel操作后,子协程异步任务仍继续执行,直到完成。

其原因就是子协程异步任务中没有对取消状态进行检测。解决的方法就是要么在异步任务循环部分中调用suspend函数(yield是一个不错的选择),要么就手动对取消状态进行检测,如下所示:

// 调用 suspend 函数
suspend fun outputTwicePerSecond(){
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
        yield()
    }
}

// 或者对 取消状态 进行检测
fun CoroutineScope.outputTwicePerSecond() {
    var startTime = System.currentTimeMillis()
    var nextPrintTime = startTime
    var i = 0
    while (this.isActive) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}

异常处理

前面内容讲过,要关闭一个协程,使用的是cancel操作,实质上,取消操作与异常是紧密联系的,当我们调用cancel时,协程内部其实是抛出了一个CancellationException异常,从而打断协程运行,但是这个异常默认会被异常处理器CoroutineExceptionHandler忽略,不过我们可以通过catch块进行捕获,如下所示:

fun main() = runBlocking {
    val child = launch {
        try {
            delay(Long.MAX_VALUE)
        } finally {
            log("子协程停止运行!")
        }
    }
    // 让子协程启动
    yield()
    log("取消子协程")
    child.cancel()
    child.join()
    // 释放父协程执行权
    delay(TimeUnit.SECONDS.toMillis(1))
    log("父协程仍在运行")
}

上述代码运行结果如下:

根据上图可以直到,对于取消操作,实质上就是抛出了一个CancellationException异常,所以可以捕获得到,且子协程抛出的CancellationException不会阻断父协程的执行。

以上我们已经知道协程对于CancellationException异常的处理,那么对于协程内部抛出的其他异常,或者是子协程发生异常,则又是怎样个处理方法呢?如下代码所示:

suspend fun main() {
    GlobalScope.launch {

        log("父协程启动")
        val child = launch {
            log("子协程抛出异常")
            throw ArithmeticException("Div 0")
        }
        // 让子协程启动
        yield()
        child.join()
        // 释放父协程执行权
        delay(TimeUnit.SECONDS.toMillis(1))
        log("父协程仍在运行")
    }.join()
}

这次抛出的是普通异常ArithmeticException,其运行结果如下所示:

可以看到,子协程抛出非CancellationException异常,会打断父协程(取消父协程)运行。具体的过程涉及的内容如下:

通道(Channel)

Channel 可以让数据流动在不同的协程中,提供了协程间通信机制。

本质上,Channel 底层就是一个线程安全的队列(类似BlockingQueue),一个协程可以往里面发送数据,另一个协程就可以在里面获取数据,完成通信过程。如下所示:

suspend fun main() {
    val channel = Channel<Int>()

    var producer = GlobalScope.launch {
        for (i in 1..10) {
            log("channel send $i")
            channel.send(i)
        }
    }

    var consumer = GlobalScope.launch {
        repeat(10) {
            val value = channel.receive()
            log("channel receiver $value")
        }
    }
    producer.join()
    consumer.join()
}

上述代码是基于Channel实现的一个简单的生产者-消费者模式。

sendreceive函数都是suspend函数,因此:

下面对Channel的一些基本操作进行简介:

冷数据流 Flow

一个suspend函数可以异步返回一个数值,而借助 异步流,我们就可以返回多个数值。

Kotlin 中提供了以下方法可以让我们返回多个数值:

下面对Flow的一些特性进行简介:

更多操作符内容,,请参考:Asynchronous Flow

协程并发安全

由于 Kotlin 协程在 JVM 上的底层实现是基于线程的,因此协程间状态共享存在数据竞争问题,此时需要进行数据同步操作。

如果协程运行不同的调度器中(或者运行在多线程调度器中,比如Dispatchers.Default),则其异步任务实际上运行在不同的线程上,此时通过 Java 自带的一些线程安全操作(比如加锁,原子类...),就可以保证协程并发安全。

但是协程本身也提供了一些并发安全措施,主要有如下几方面内容:

Select Expression (experimental)

Select 表达式,即 多路复用,与 Unix 中的 IO 多路复用功能相似。

Kotlin 中的 多路复用select,可以对多个协程某些操作进行复用,比如:

多路复用select 除了支持上述的onAwaitonReceive事件外,它还支持很多其他事件,比如onJoinonSend...
事实上,所有能被select支持的事件都是SelectClasueN类型,具体包含以下几种:

...

更多具体内容,请参考:Select Expression

综上,多路复用select 的主要作用就是可以同时等待多个协程,并选择获取最先可用协程的结果。

一些注意点

  1. 对一个父协程进行取消操作,会自动取消它作用域内所有的协程:

    suspend fun showStatus(type: String) {
        log("$type starts")
        try {
            delay(100)
            log("$type done")
        } finally {
            log("$type cancelled")
        }
    }
    
    fun main() = runBlocking {
    
        val scope = CoroutineScope(Dispatchers.Default)
        scope.launch {
            launch { showStatus("launch") }
    
            launch {
                async { showStatus("async") }.await()
            }
    
            launch {
                withContext(Dispatchers.IO) { showStatus("withContext") }
            }
    
            coroutineScope { showStatus("coroutineScope") }
            // ...
        }
    
        delay(80)
        log("父协程取消")
        //  取消父协程,导致其作用域内所有协程取消
        scope.cancel()
    }
    

    当调用最外层scope.cancel()时,其作用域内所有子协程、子子协程...都会被取消。结果如下图所示:

  2. 创建一个协程时,可以为其上下文对象添加一个Job对象,对该Job对象调用取消操作时,与该Job绑定的协程及其子协程都会被取消:

    fun main() = runBlocking {
    
        val job = Job()
        // 将协程与 Job 绑定
        val scope = CoroutineScope(Dispatchers.Default + job)
        scope.launch {
            // ...
        }
    
        delay(80)
        //  取消 Job 绑定的协程,导致其作用域内所有协程取消
        job.cancel()
    }
    

    上述代码与第 1 点的代码基本一致,只是将CoroutineScopeJob绑定了起来,因此,调用Job.cancel()就可以取消与其绑定的协程,从而该协程作用域内的所有协程也都会被取消。

  3. 协程作用域内应当使用launchasynccoroutineScopesupervisorScope这些能继承父协程上下文对象或者withContext等具备结构化并发特性的协程构造器来创建子协程,这样才能满足上述 1,2 条目结果,不要使用GlobalScope.launchCoroutineScope.launch等创建子协程,因为这些操作创建的协程运行在其他作用域内,跳脱了当前协程作用域,导致当前协程无法对其进行控制:

    fun main() = runBlocking {
    
        val scope = CoroutineScope(Dispatchers.Default)
        scope.launch {
    
            GlobalScope.launch {
                showStatus("GlobalScope")
            }
    
            CoroutineScope(Dispatchers.Default).launch {
                showStatus("CoroutineScope")
            }
        }
    
        delay(80)
        log("父协程取消")
        scope.cancel()
        // 避免程序退出
        delay(1000)
    }
    

    结果如下图所示:

    可以看到,父协程取消,子协程仍在运行,父协程不具备控制子协程能力。

参考

上一篇 下一篇

猜你喜欢

热点阅读