kotlin 协程的启动与取消

2023-08-13  本文已影响0人  Bfmall

一、启动构建器

launch与async构建器都用来启动新协程:

1、launch,返回一个Job,并且不附带任何结果值
2、async,返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到它的最终结果。

等待一个作业:

1、线程切换

    launch(Dispatchers.Default) {
        println("1")
        withContext(Dispatchers.IO) {
            delay(1000)
            println("2")
        }
        println("3")
    }

withContext 实现`异步线程同步化`,打印顺序是:1 2 3

2、join

    val job = launch {
        delay(1000)
        println("1")
    }
    job.join() // 等待job执行完毕
    launch {
        println("2")
    }

使用 join 函数实现等待效果。 以上代码的执行顺序是:1 2

3、await

    async {
        delay(1000)
        println("1")
    }.await()// 等待job执行完毕
    launch {
        println("2")
    }

  使用 await 函数实现等待效果。 以上代码的执行顺序是:1 2

4、async 组合并发

已知有两个耗时任务:

    suspend fun doOne(): Int {
        delay(1000)
        return 1
    }
    
    suspend fun doTwo(): Int {
        delay(1000)
        return 2
    }

现在开始执行这两个任务,计算两个挂起函数的返回值之和:

    val time = measureTimeMillis {
        val one = doOne()
        val two = doTwo()
        println("两数之和:" + (one + two))
    }
    println("两个任务耗时:$time")

以上程序的执行是顺序的,先执行 doOne,再执行 doTwo,总耗时为:2037毫秒。

这种做法往往不太可取,因为太过耗时,往往不推荐。

另一种方法是使用 async 实现:

    val time = measureTimeMillis {
        val one = async {
            doOne()
        }.await()
        val two = async {
            doTwo()
        }.await()
        println("两数之和:" + (one + two))
    }
    println("两个任务耗时:$time")

但是,两个任务仍然是顺序执行的,耗时和前者差不多。

为了解决两个任务不必要的耗时问题,推荐使用 async 组合并发:

    val time = measureTimeMillis {
        val job1 = async {
            doOne()
        }
        val job2 = async {
            doTwo()
        }
        val one = job1.await()
        val two = job2.await()
        println("两数之和:" + (one + two))
    }
    println("两个任务耗时:$time")

最终,耗时为 1068 毫秒。

二、启动模式

CoroutineStart.DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态;
CoroutineStart.ATOMIC(原子):协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消;
CoroutineStart.LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
CoroutineStart.UNDIPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到遇到一个真正被挂起的点。

取消的时机:

DEFAULT:调用cacel后,直接取消调度
ATOMIC:调用cacel后,当协程执行到第一个挂起函数时才会取消调度,挂起函数一般是耗时操作,
    第一个挂起函数之前是必须执行的代码,如果存在此场景,则使用 ATOMIC 启动模式
LAZY:当协程被使用的时候才开始调度,可以在调度前取消协程,也可以在调度过程中取消协程
UNDIPATCHED:不分发,即使指定了调度器,也会默认在当前函数的调用栈执行,而且是立即执行。

private fun test() = runBlocking {
    launch (context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
        println(Thread.currentThread().name)
    }
}

runBlocking 在主线程中执行,在协程中,即使指定了 Dispatchers.IO,依然在主线程中执行。

三、作用域构建器

runBlocking 和 coroutineScope:

1、runBlocking 是常规函数,而 coroutineScope 是挂起函数
2、它们都会等待其协程体以及所有子协程结束,主要区别在于 runBlocking 方法会阻塞当前线程来等待,
    而 coroutineScope 只是挂起,会释放底层线程用于其他用途

coroutineScope 和 supervisorScope:
1、coroutineScope 一个协程失败了,所有其它兄弟协程也会被取消
2、supervisorScope 一个协程失败了,不会影响其它兄弟协程

使用场景:

runBlocking:只可用于调试
coroutineScope:等待其协程体以及所有子协程结束,并且一个协程取消,其它兄弟协程全部取消的场景

四、Job的生命周期

协程的生命周期是由Job对象来管理的,job对象获取生命周期的方法有:

        job.isActive // 是否激活
        job.isCancelled // 是否取消
        job.isCompleted // 是否完成

一个协程包括的状态是:新建(New)、激活(Active)、完成中(Completing)、已完成(Completed)、取消中(Cacelling)、已取消(Cacelled)。

五、协程的取消

【1】取消作用域会取消它的子协程。

    // 定义一个协程作用域
    val scope = CoroutineScope(Dispatchers.Default)
    scope.launch {
        delay(1000)
        println(1)
    }
    scope.launch {
        delay(1000)
        println(2)
    }
    scope.cancel()

此时,两个协程全部被取消。

【2】被取消的子协程并不会影响其余兄弟协程

    // 定义一个协程作用域
    val scope = CoroutineScope(Dispatchers.Default)
    val job1 = scope.launch {
        delay(1000)
        println(1)
    }
    val job2 = scope.launch {
        delay(1000)
        println(2)
    }
    job1.cancel()

job1被取消,job2 没有被取消。

【3】协程通过抛出一个特殊的异常 CancellationException 来处理取消操作

    // 定义一个协程作用域
    val job = launch {
        delay(1000)
        println(1)
    }
    delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
    job.cancel()
    job.join()

delay 是一个挂起函数,当它被取消时,会报 CancellationException 异常,此时协程会直接被取消,我们可以利用 try...catch来捕获次异常:

    val job = launch {
        try {
            delay(1000) // 一个挂起函数,处理耗时任务
            println(1)
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
    delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
    job.cancel()
    job.join()
}

此时,打印了异常:

kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@1b68ddbd

并且,协程被取消。

六、CPU密集型任务取消

    val job = launch(Dispatchers.Default) {
        var count = 0
        while (count < 1000000000) {
            count += 1
            if (count % 1000 == 0) {
                println(count)
            }
        }
    }
    delay(10)
    job.cancelAndJoin()

以上代码使用了while循环,执行时CPU高度运作,当协程执行密集型任务时,协程无法被取消。

这时,需要结合job的生命周期,修改后的代码如下:

    val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
        var count = 0
        while (count < 1000000000 && isActive) {
            count += 1
            if (count % 1000 == 0) {
                println(count)
            }
        }
    }
    delay(10)
    job.cancelAndJoin()

仅仅在 while 中 添加了 isActive,此时是可以被取消的。

还有一种方法是添加 ensureActive 实现:

    val job = launch(Dispatchers.Default) {  // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
        var count = 0
        while (count < 1000000000) {
            ensureActive()
            count += 1
            if (count % 1000 == 0) {
                println(count)
            }
        }
    }
    delay(10)
    job.cancelAndJoin()

使用 ensureActive 和 使用 isActive, 从效果上是差不多的。

isActive: 是一个可以被使用在 CoroutineScope 中的扩展属性,检查 Job 是否处于活跃状态。

ensureActive():如果Job处于非活跃状态,则抛出异常。

yield 函数会检查所在协程的状态,如果已经取消,则抛出异常。次外,它还会尝试让出线程的执行权,给其他协程协程提供执行机会。

  val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
      var count = 0
      while (count < 1000000000) {
          yield()
          count += 1
          if (count % 1000 == 0) {
              println(count)
          }
      }
  }
  delay(10)
  job.cancelAndJoin()

yield() 会让出线程执行权,其它线程执行完会继续执行。

比较:

1、isActive 和 ensureActive 可以让CPU密集型协程成功取消;
2、yield 既可以让CPU密集型协程成功取消,也可以让出一部分执行权给其它任务处理(当程序非常密集的时候使用)

七、协程取消的副作用

协程被取消时,会抛出异常,导致下面代码无法执行到,下面的代码可能是必须执行的逻辑,比如释放资源。

可以将必须要执行的代码放在 finally 中执行:

    val job = launch(Dispatchers.Default) {
        try {
            delay(1000)
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            // 释放资源
            println("release")
        }
    }
    delay(10)
    job.cancelAndJoin()

如果是文件操作,需要在 finally 中关闭资源:

    val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
    readBuffer.apply {
        try {
            var line: String? = null
            while (true) {
                line = readLine() ?: break
                println(line)
            }
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            close()
        }
    }

使用标准库中的 use 函数,可以简化代码:

    val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
    readBuffer.use {
        var line: String? = null
        while (true) {
            line = readLine() ?: break
            println(line)
        }
    }

use 函数中已经封装了try...catch,以及释放资源。

八、不能被取消的任务

    val job = launch(Dispatchers.Default) {
        withContext(NonCancellable) {
            delay(1000)
            println("1111111")
        }
        delay(1000)
        println("2222222222")
    }
    delay(10)
    job.cancelAndJoin()

使用 withContext(NonCancellable) ,任务可以不被取消。

九、超时任务

    val result = withTimeoutOrNull(1000) { // 1秒超时任务
        repeat(1000) {
            println(1111)
            delay(10)
        }
        "Success"
    } ?: "Failed"
    println(result)

使用 withTimeoutOrNull 实现超时任务。

作者:NoBugException
链接:https://www.jianshu.com/p/83148dc6b168
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

上一篇下一篇

猜你喜欢

热点阅读