kotlin<第八篇>:协程的启动与取消
一、启动构建器
launch与async构建器都用来启动新协程:
1、launch,返回一个Job,并且不附带任何结果值
2、async,返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到它的最终结果。
等待一个作业:
1、线程切换
launch(Dispatchers.Default) {
println("1")
withContext(Dispatchers.IO) {
delay(1000)
println("2")
}
println("3")
}
withContext 实现`异步线程同步化`,打印顺序是:1 2 3
2、join
val job = launch {
delay(1000)
println("1")
}
job.join() // 等待job执行完毕
launch {
println("2")
}
使用 join 函数实现等待效果。 以上代码的执行顺序是:1 2
3、await
async {
delay(1000)
println("1")
}.await()// 等待job执行完毕
launch {
println("2")
}
使用 await 函数实现等待效果。 以上代码的执行顺序是:1 2
4、async 组合并发
已知有两个耗时任务:
suspend fun doOne(): Int {
delay(1000)
return 1
}
suspend fun doTwo(): Int {
delay(1000)
return 2
}
现在开始执行这两个任务,计算两个挂起函数的返回值之和:
val time = measureTimeMillis {
val one = doOne()
val two = doTwo()
println("两数之和:" + (one + two))
}
println("两个任务耗时:$time")
以上程序的执行是顺序的,先执行 doOne,再执行 doTwo,总耗时为:2037毫秒。
这种做法往往不太可取,因为太过耗时,往往不推荐。
另一种方法是使用 async 实现:
val time = measureTimeMillis {
val one = async {
doOne()
}.await()
val two = async {
doTwo()
}.await()
println("两数之和:" + (one + two))
}
println("两个任务耗时:$time")
但是,两个任务仍然是顺序执行的,耗时和前者差不多。
为了解决两个任务不必要的耗时问题,推荐使用 async 组合并发:
val time = measureTimeMillis {
val job1 = async {
doOne()
}
val job2 = async {
doTwo()
}
val one = job1.await()
val two = job2.await()
println("两数之和:" + (one + two))
}
println("两个任务耗时:$time")
最终,耗时为 1068 毫秒。
二、启动模式
CoroutineStart.DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态;
CoroutineStart.ATOMIC(原子):协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消;
CoroutineStart.LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
CoroutineStart.UNDIPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到遇到一个真正被挂起的点。
取消的时机:
DEFAULT:调用cacel后,直接取消调度
ATOMIC:调用cacel后,当协程执行到第一个挂起函数时才会取消调度,挂起函数一般是耗时操作,
第一个挂起函数之前是必须执行的代码,如果存在此场景,则使用 ATOMIC 启动模式
LAZY:当协程被使用的时候才开始调度,可以在调度前取消协程,也可以在调度过程中取消协程
UNDIPATCHED:不分发,即使指定了调度器,也会默认在当前函数的调用栈执行,而且是立即执行。
private fun test() = runBlocking {
launch (context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
println(Thread.currentThread().name)
}
}
runBlocking 在主线程中执行,在协程中,即使指定了 Dispatchers.IO,依然在主线程中执行。
三、作用域构建器
runBlocking 和 coroutineScope:
1、runBlocking 是常规函数,而 coroutineScope 是挂起函数
2、它们都会等待其协程体以及所有子协程结束,主要区别在于 runBlocking 方法会阻塞当前线程来等待,
而 coroutineScope 只是挂起,会释放底层线程用于其他用途
coroutineScope 和 supervisorScope:
1、coroutineScope 一个协程失败了,所有其它兄弟协程也会被取消
2、supervisorScope 一个协程失败了,不会影响其它兄弟协程
使用场景:
runBlocking:只可用于调试
coroutineScope:等待其协程体以及所有子协程结束,并且一个协程取消,其它兄弟协程全部取消的场景
四、Job的生命周期
协程的生命周期是由Job对象来管理的,job对象获取生命周期的方法有:
job.isActive // 是否激活
job.isCancelled // 是否取消
job.isCompleted // 是否完成
一个协程包括的状态是:新建(New)、激活(Active)、完成中(Completing)、已完成(Completed)、取消中(Cacelling)、已取消(Cacelled)。
image.png五、协程的取消
【1】取消作用域会取消它的子协程。
// 定义一个协程作用域
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
delay(1000)
println(1)
}
scope.launch {
delay(1000)
println(2)
}
scope.cancel()
此时,两个协程全部被取消。
【2】被取消的子协程并不会影响其余兄弟协程
// 定义一个协程作用域
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
delay(1000)
println(1)
}
val job2 = scope.launch {
delay(1000)
println(2)
}
job1.cancel()
job1被取消,job2 没有被取消。
【3】协程通过抛出一个特殊的异常 CancellationException 来处理取消操作
// 定义一个协程作用域
val job = launch {
delay(1000)
println(1)
}
delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
job.cancel()
job.join()
delay 是一个挂起函数,当它被取消时,会报 CancellationException 异常,此时协程会直接被取消,我们可以利用 try...catch来捕获次异常:
val job = launch {
try {
delay(1000) // 一个挂起函数,处理耗时任务
println(1)
} catch (e: Exception) {
e.printStackTrace()
}
}
delay(100) // 延迟100毫秒,防止还没指定到delay就被取消
job.cancel()
job.join()
}
此时,打印了异常:
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@1b68ddbd
并且,协程被取消。
六、CPU密集型任务取消
val job = launch(Dispatchers.Default) {
var count = 0
while (count < 1000000000) {
count += 1
if (count % 1000 == 0) {
println(count)
}
}
}
delay(10)
job.cancelAndJoin()
以上代码使用了while循环,执行时CPU高度运作,当协程执行密集型任务时,协程无法被取消。
这时,需要结合job的生命周期,修改后的代码如下:
val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
var count = 0
while (count < 1000000000 && isActive) {
count += 1
if (count % 1000 == 0) {
println(count)
}
}
}
delay(10)
job.cancelAndJoin()
仅仅在 while 中 添加了 isActive,此时是可以被取消的。
还有一种方法是添加 ensureActive 实现:
val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败
var count = 0
while (count < 1000000000) {
ensureActive()
count += 1
if (count % 1000 == 0) {
println(count)
}
}
}
delay(10)
job.cancelAndJoin()
使用 ensureActive 和 使用 isActive, 从效果上是差不多的。
-
isActive: 是一个可以被使用在 CoroutineScope 中的扩展属性,检查 Job 是否处于活跃状态。
-
ensureActive():如果Job处于非活跃状态,则抛出异常。
-
yield 函数会检查所在协程的状态,如果已经取消,则抛出异常。次外,它还会尝试让出线程的执行权,给其他协程协程提供执行机会。
val job = launch(Dispatchers.Default) { // 注意:必须指定调度器,否则CPU密集型协程可能会取消失败 var count = 0 while (count < 1000000000) { yield() count += 1 if (count % 1000 == 0) { println(count) } } } delay(10) job.cancelAndJoin()
yield() 会让出线程执行权,其它线程执行完会继续执行。
比较:
1、isActive 和 ensureActive 可以让CPU密集型协程成功取消;
2、yield 既可以让CPU密集型协程成功取消,也可以让出一部分执行权给其它任务处理(当程序非常密集的时候使用)
七、协程取消的副作用
协程被取消时,会抛出异常,导致下面代码无法执行到,下面的代码可能是必须执行的逻辑,比如释放资源。
可以将必须要执行的代码放在 finally 中执行:
val job = launch(Dispatchers.Default) {
try {
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
} finally {
// 释放资源
println("release")
}
}
delay(10)
job.cancelAndJoin()
如果是文件操作,需要在 finally 中关闭资源:
val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
readBuffer.apply {
try {
var line: String? = null
while (true) {
line = readLine() ?: break
println(line)
}
} catch (e: Exception) {
e.printStackTrace()
} finally {
close()
}
}
使用标准库中的 use 函数,可以简化代码:
val readBuffer = BufferedReader(FileReader("D:\\xx.text"))
readBuffer.use {
var line: String? = null
while (true) {
line = readLine() ?: break
println(line)
}
}
use 函数中已经封装了try...catch,以及释放资源。
八、不能被取消的任务
val job = launch(Dispatchers.Default) {
withContext(NonCancellable) {
delay(1000)
println("1111111")
}
delay(1000)
println("2222222222")
}
delay(10)
job.cancelAndJoin()
使用 withContext(NonCancellable)
,任务可以不被取消。
九、超时任务
val result = withTimeoutOrNull(1000) { // 1秒超时任务
repeat(1000) {
println(1111)
delay(10)
}
"Success"
} ?: "Failed"
println(result)
使用 withTimeoutOrNull 实现超时任务。
[本章完...]