Kotlin中协程封装与取消
1. 将java中的回调接口封装成挂起函数
- 以OKHttp请求为例,代码如下 :
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Response
import java.io.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
suspend fun Call.await(): String = suspendCoroutine { block ->
enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
block.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
if (response.isSuccessful) {
block.resume(response.body()!!.string())
}
}
})
}
- 挂起函数的使用如下:
val client: OkHttpClient = OkHttpClient.Builder() .build()
val request: Request = Request.Builder()
.get()
.url("http://xxx")
.build()
GlobalScope.async(Dispatchers.Main) {
val call = client.newCall(request)
val data = call.await()
}
因为Call的扩展函数await()是个挂起函数,所以必须运行在协程或者其他的挂起函数中
-
对OkHttp的Call类继续扩展,添加一个await()函数,函数的返回值为 String类型,在函数中使用挂起函数suspendCoroutine{}或suspendCancellableCoroutine{},await()函数调用时,首先会挂起当前协程,然后执行enqueue将网络请求放入队列中,当请求成功时,通过block.resume(response.body()!!.string())来恢复之前的协程。
-
suspendCancellableCoroutine{}函数实现功能的函数:
delay()函数
yield()函数:挂起当前协程,然后将协程分发到 Dispatcher 的队列,这样可以让该协程所在线程或线程池可以运行其他协程逻辑,然后在 Dispatcher 空闲的时候继续执行原来协程。简单的来说就是让出自己的执行权,给其他协程使用,当其他协程执行完成或也让出执行权时,一开始的协程可以恢复继续运行
2. 父子协程
-
GlobalScope.launch()函数和GlobalScope.async()函数是没有父协程的被称为全局协程。
-
当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过 CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程任务的子任务。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。例子如下:
GlobalScope.launch(Dispatchers.Main) {
//父协程
launch {
//子协程
}
async {
//子协程
}
withContext(coroutineContext){
//子协程
}
}
-
协程间父子关系有三种影响:
-
父协程手动调用cancel()或者异常结束,会立即取消它的所有子协程。
-
父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。
-
子协程抛出未捕获的异常时,默认情况下会取消其父协程。
-
3. 协程取消
- 实例1:
fun main() = runBlocking {
val job = launch {
launch {
repeat(6) {
println("run_child:$it")
delay(500)
}
}
repeat(6) {
println("run_parent:$it")
delay(500)
}
}
delay(1600)
job.cancel()
println("job end")
}
-
结果
-
上面代码中 job 取消后,delay()会检测协程是否已取消,所以 job.cancel()调用之后上面的父协程和子协程也都会取消
-
实例2:
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var nextTime = 0L
var i = 1
while (i <= 3) {
val nowTime = System.currentTimeMillis()
if (nowTime >= nextTime) {
println("parent_${i++}")
nextTime = nowTime + 500L
}
}
}
delay(800)
job.cancel()
println("job end")
}
-
结果
-
上面代码中 job 取消后,没有检测协程状态的逻辑,都是计算逻辑,所以 job 的运算逻辑还是会继续运行。
-
为了可以及时取消协程的运算逻辑,可以检测协程的状态,使用isActive来判断,上面示例中可以将while(i <= 3)替换为while(isActive)。代码如下:
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var nextTime = 0L
var i = 1
while (isActive) {
val nowTime = System.currentTimeMillis()
if (nowTime >= nextTime) {
println("parent_${i++}")
nextTime = nowTime + 500L
}
}
}
delay(1200)
job.cancel()
println("job end")
}
-
结果
-
运行不能取消的代码块当手动取消协程后,像delay()这样的可取消挂起函数会在检测到已取消状态时,抛出 CancellationException 异常,然后退出协程。此时可以使用try { ... } finally { ... }表达式或<T : Closeable?, R> T.use {}函数执行终结动作或关闭资源。但是如果在finally块中调用自定义的或系统的可取消挂起函数,都会再次抛出 CancellationException 异常。通常我们在finally块中关闭一个文件,取消一个任务或者关闭一个通信通道都是非阻塞,并且不会调用任何挂起函数。当需要挂起一个被取消的协程时,可以将代码包装在withContext(NonCancellable) { ... }中。
4.超时取消
- 协程库中已经提供来withTimeout() { ... }挂起函数来实现在超时后自动取消协程。它会在超时后抛出TimeoutCancellationException,它是 CancellationException 的子类,它是协程结束的正常原因,不会打印堆栈跟踪信息,如果在取消后需要执行一些关闭资源的操作可以使用前面提到的try { ... } finally { ... }表达式。
try {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
} finally {
println("I'm running finally")
}
- 还有一个withTimeoutOrNull() { ... }挂起函数在超时后返回null,而不是抛出一个异常。
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" /* 在它运行得到结果之前取消它,如果循环次数x延迟时间小等于1300则返回Done,否则返回null*/
}
println("Result is $result")