Android Kotlin

Android中对Kotlin Coroutines(协程)的理

2022-09-27  本文已影响0人  会上网的井底之蛙

(接第一部分)

不同根协程的协程之间,异常并不会自动传递,比如:

fun main ()  = runBlocking {
    val job1 = launch {
        delay(2000)
        println("job finished")
    }

    val job2 = CoroutineScope(Dispatchers.IO).async{
        delay(1000)
        println("job2 finished")
        throw IllegalArgumentException()
        println("new CoroutineScope finished")
    }

    delay(3000)
    println("finished")
}

运行结果:

job2 finished
job finished
finished

CancellationException(取消异常):

CancellationException 会被 CoroutineExceptionHandler 忽略,但能被 try-catch 捕获。

fun main ()  = runBlocking {
    val job1 = launch {
        delay(2000)
        println("job finished")
    }

    val job2 = launch {
        delay(1000)
        println("job2 finished")
    }
    job2.cancel() //job2取消息了,其实并没有触发CancellationException异常
    delay(3000)
    println("finished")
}

运行结果:

job finished
finished

捕捉一下取消异常:

fun main ()  = runBlocking {
    val job1 = launch {
        delay(2000)
        println("job finished")
    }

    val job2 = launch {
        try {
            delay(1000)
            println("job2 finished")
        } catch (e:Exception) {
            e.printStackTrace()
        }
    }
    job2.cancel()

    delay(3000)
    println("finished")
}

运行结果:

job finished
finished

奇怪,并没有发生异常,什么原因呢?

因为可取消的挂起函数会在取消时抛出CancellationException,上面delay(1000)会在取消时抛出CancellationException,但是上面的代码中 delay(1000)并没有执行,因为协程还没有开始执行就被 cancel

上面的例子稍加修改:

fun main ()  = runBlocking {
    val job1 = launch {
        delay(2000)
        println("job finished")
    }

    val job2 = launch {
        try {
            delay(1000)
            println("job2 finished")
        } catch (e:Exception) {
            e.printStackTrace()
        }
    }
    delay(500) //延迟500毫秒,让job2处于delay状态
    job2.cancel()

    delay(3000)
    println("finished")
}

运行结果:

kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@184f6be2
job finished
finished

去掉捕捉异常:

fun main ()  = runBlocking {
    val job1 = launch {
        delay(2000)
        println("job finished")
    }

    val job2 = launch {
        delay(1000)
        println("job2 finished")
    }
    delay(500)//延迟500毫秒,让job2处于delay状态
    job2.cancel()

    delay(3000)
    println("finished")
}

运行结果:

job finished
finished

为什么没有抛出异常呢?

因为kotlin的协程是这样规定的:CancellationException这个异常是被视为正常现象的取消。协程在内部使用 CancellationException 来进行取消,所有处理程序都会忽略这类异常,因此它们仅用作调试信息的额外来源,这些信息可以用 catch 块捕获。

如果不希望协程内的异常向上传播或影响同级协程。可以使用 SupervisorJob

协程的上下文为SupervisorJob时,该协程中的异常不会向外传播,因此不会影响其父亲/兄弟协程,也不会被其兄弟协程抛出的异常影响

我们常见的 MainScopeviewModelScopelifecycleScope 都是用 SupervisorJob()创建的,所以这些作用域中的子协程异常不会导致根协程退出

正确使用SupervisorJob的方法:

// job1、job2、job3和job4的父Job都是SupervisorJob

val scope = CoroutineScope(SupervisorJob()) 
job1 = scope.launch {...}
job2 = scope.launch {...}

supervisorScope { 
    job3 = launch {...}
    job4 = launch {...}
}

而不是采用launch(SupervisorJob()){...}这种方式(launch生成的协程的父jobSupervisorJob,其大括号内部的job依然是普通Job)

比如修改一下第一个例子:

fun main ()  = runBlocking {
    val scope = CoroutineScope(SupervisorJob())

    scope.launch {
        val job1 = scope.launch {
            delay(2000)
            println("job finished")
        }

        val job2 = scope.launch {
            delay(1000)
            println("job2 finished")
            throw IllegalArgumentException()
        }
        delay(2000)
        println("parent job finished")
    }

    delay(3000)
    println("finished")
}

运行结果:

job2 finished
Exception in thread "DefaultDispatcher-worker-3" java.lang.IllegalArgumentException
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$1$job2$1.invokeSuspend(Test.kt:396)
  ......  
job finished
parent job finished
finished

虽然job2发生发异常,但是并没有影响job1和父协程

但是如果采用不正确的方式,比如:

fun main() = runBlocking{
    val grandfatherJob = SupervisorJob()
    //创建一个Job,
    val job = launch(grandfatherJob) {
        //启动一个子协程
        val childJob1 = launch {
            println("childJob1 start")
            delay(1000)

            throw IllegalArgumentException()
            println("childJob1 end")
        }

        val childJob2 = launch {
            println("childJob2 start")
            delay(2000)
            println("childJob2 end")
        }
    }

    delay(3000)
    println("end")
}

运行结果:

childJob1 start
childJob2 start
Exception in thread "main" java.lang.IllegalArgumentException
  ...... 
end

可以看出childJob1的异常影响了childJob2,并没有阻止异常的传递,主要就是SupervisorJob的使用方式不对。

GlobalScope.launch方式启动的是顶层协程,本身不存在父协程,在里面发生异常后, 只会在logCat输出异常异常,并不会影响到外部线程的运行,比如:

fun main() = runBlocking {
    println("start")
    GlobalScope.launch {
        println("launch Throwing exception")
        throw NullPointerException()
    }
    Thread.sleep(3000)
    //GlobalScope.launch产生的异常不影响该线程执行
    println("end")
}

运行结果:

start
launch Throwing exception
Exception in thread "DefaultDispatcher-worker-1" java.lang.NullPointerException
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$1.invokeSuspend(Test.kt:511)
  ......
end

再比如:

fun main() = runBlocking {
    println("start")

    launch {
        GlobalScope.launch {
            println("launch Throwing exception")
            throw NullPointerException()
        }
        delay(1000)
        println("out launch end")
    }

    delay(3000)
    //GlobalScope.launch产生的异常不影响该线程执行
    println("end")
}

运行结果:

start
launch Throwing exception
Exception in thread "DefaultDispatcher-worker-1" java.lang.NullPointerException
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$1$1.invokeSuspend(Test.kt:513)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  ......
out launch end
end

GlobalScope.async呢?与GlobalScope.launch是不同的,因为GlobalScope.async在使用await()方法时会抛出异常,比如:

fun main() = runBlocking {
    println("start")
    val job = GlobalScope.async {
        println("launch Throwing exception")
        throw NullPointerException()
    }

    job.join()//采用join

    println("end")
}

输出:

start
launch Throwing exception
end

join改为await

fun main() = runBlocking {
    println("start")
    val job = GlobalScope.async {
        println("launch Throwing exception")
        throw NullPointerException()
    }

    job.await()//采用await

    println("end")
}

输出:

start
launch Throwing exception
Exception in thread "main" java.lang.NullPointerException
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$job$1.invokeSuspend(Test.kt:511)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  ......

可以看出GlobalScope.async出现异常后,当使用了await时,还是会影响外部线程。

2.协程的异常处理

launch:

通过launch启动的异常可以通过try-catch来进行异常捕获,或者使用协程封装的拓展函数runCatching来捕获(其内部也是使用的try-catch),还可以使用CoroutineExceptionHandler 对异常进行统一处理,这也更符合结构化并发原则。

使用try-catch时,要注意:不要用try-catch直接包裹launch、async

使用CoroutineExceptionHandler 捕获异常需要满足:

CoroutineExceptionHandler 需要存在于 CoroutineScopeCoroutineContext 中,或者在 CoroutineScope 或者 supervisorScope 创建的直接子协程中。

采用try-catch的例子:

fun main() = runBlocking {
    val scope = CoroutineScope(Job())

    scope.launch {
        try {
            throw NullPointerException("a exception")
        } catch(e: Exception) {
            println("handle exception : ${e.message}")
        }
    }

    delay(1000)
}

输出:

handle exception : a exception

or

fun main() = runBlocking {
    val scope = CoroutineScope(Job())

    scope.launch {
        runCatching {
            throw NullPointerException("a exception")
        }.onFailure {
            println("handle exception : ${it.message}")
        }
    }

    delay(1000)
}

输出:

handle exception : a exception

如果直接用try-catch包裹launch

fun main() = runBlocking {
    val scope = CoroutineScope(Job())

    try {
        scope.launch {
            throw NullPointerException("a exception")
        }
    } catch(e: Exception) {
        println("handle exception : ${e.message}")
    }


    delay(1000)
}

输出:

Exception in thread "DefaultDispatcher-worker-1" java.lang.NullPointerException: a exception
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$1.invokeSuspend(Test.kt:530)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
  ......

可以发现,异常并没有被捕获,所以要将try-catch放到协程体内部

采用CoroutineExceptionHandler的例子:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job()+handleException)

    scope.launch {
        throw NullPointerException("a exception")
    }

    delay(1000)
}

输出:

CoroutineExceptionHandler catch java.lang.NullPointerException: a exception

or

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job())

    scope.launch(handleException) {
        throw NullPointerException("a exception")
    }

    delay(1000)
}

输出:

CoroutineExceptionHandler catch java.lang.NullPointerException: a exception

如果改为这样呢:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job())

    scope.launch {

        launch(handleException) {
            throw NullPointerException("a exception")
        }
    }

    delay(1000)
}

输出:

Exception in thread "DefaultDispatcher-worker-2" java.lang.NullPointerException: a exception
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$1$1.invokeSuspend(Test.kt:536)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  ......

因为CoroutineExceptionHandler使用的位置不对,所以并没有发挥作用

再修改一下:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job())

    scope.launch(handleException) {

        launch {
            throw NullPointerException("a exception")
        }
    }

    delay(1000)
}

输出:

CoroutineExceptionHandler catch java.lang.NullPointerException: a exception

所以要注意使用CoroutineExceptionHandler 捕获异常时需要满足的条件。

async:

根协程(由协程作用域直接管理的协程) ,await 时抛出异常:

比如:

fun main() = runBlocking {
    //async 开启的协程为根协程
    val deferred = GlobalScope.async {
        throw Exception()
    }
    try {
        deferred.await() //抛出异常
    } catch (t: Throwable) {
        println("捕获异常:$t")
    }
} 

输出:

捕获异常:java.lang.Exception

supervisorScope 的直接子协程,await 时抛出异常:

fun main() = runBlocking {
    supervisorScope {
        //async 开启的协程为 supervisorScope 的直接子协程
        val deferred = async {
            throw Exception()
        }
        try {
            deferred.await() //抛出异常
        } catch (t: Throwable) {
            println("捕获异常:$t")
        }
    }
}

输出:

捕获异常:java.lang.Exception

根协程(由协程作用域直接管理的协程) ,不用await:

fun main() = runBlocking {
    //async 开启的协程为根协程
    val deferred = GlobalScope.async {
        println("async a coroutine")
        throw Exception()
    }
    try {
        deferred.join()
    } catch (t: Throwable) {
        println("捕获异常:$t")
    }
    delay(1000)
    println("end")
}

输出:

async a coroutine
end

上面并没有捕捉到异常,外部的线程也没有被影响

supervisorScope 的直接子协程,不用await

fun main() = runBlocking {
    supervisorScope {
        //async 开启的协程为 supervisorScope 的直接子协程
        val deferred = async {
            println("async a coroutine")
            throw Exception()
        }

        try {
            deferred.join()
        } catch (t: Throwable) {
            println("捕获异常:$t")
        }
    }
    delay(1000)
    println("end")
}

输出:

async a coroutine
end

上面并没有捕捉到异常,外部的线程也没有被 影响

如果是同一线程呢,比如:

override fun onCreate(savedInstanceState: Bundle?) {
    ......
    test()
    println("ddd test end")
}

fun test() {
    MainScope().launch {
        //async 开启的协程为协程作用域直接管理的协程
        val deferred = GlobalScope.async(Dispatchers.Main) {
            println("ddd async a coroutine thread:${Thread.currentThread().name}")
            throw Exception()
        }
        try {
            deferred.join()
        } catch (t: Throwable) {
            println("ddd 捕获异常:$t")
        }
        delay(1000)
        println("ddd end thread:${Thread.currentThread().name}")
    }
}

输出:

ddd test end
ddd async a coroutine thread:main
ddd end thread:main

可看出async在主线程发生了异常,但是没有影响主线程的执行,把deferred.join()去掉结果也一样

其它情况,异常会在发生时立刻抛出并传播,需要在异常发生的地方进行捕捉,比如:

fun main() = runBlocking {

    val deferred = async {
        try {
            println("async a coroutine")
            throw Exception()
        } catch (t: Throwable) {
            println("捕获异常:$t")
        }
    }

    deferred.await() //不能在此捕捉,虽然能捕捉到异常,但是无法阻止异常的传播

    delay(1000)
    println("end")
}

输出:

async a coroutine
捕获异常:java.lang.Exception
end

使用CoroutineExceptionHandler

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val deferred = scope.async {
        println("async a coroutine")
        throw Exception()
    }

    deferred.await() 

    delay(1000)
    println("end")
}

输出:

async a coroutine
Exception in thread "main" java.lang.Exception
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$deferred$1.invokeSuspend(Test.kt:627)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  ......

奇怪,并没有捕捉到异常,为什么?可以再回头看看async的第一条,原来是因为async产生了顶级协程,只能在await时捕捉,改一下:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    //由于为顶级协程,故CoroutineExceptionHandler不会起作用
    val deferred = scope.async {
        println("async a coroutine")
        throw Exception()
    }

    try {
        deferred.await() //async顶级协程需要在此捕捉
    } catch (t: Throwable) {
        println("捕获异常:$t")
    }


    delay(1000)
    println("end")
}

发生嵌套的情况

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val deferred = scope.async {
        async {
            launch {
                println("async a coroutine")
                throw Exception()
            }
        }
    }

    //下面的try-catch所有代去掉,将不会产生异常
    try {
        deferred.await()
    } catch (t: Throwable) {
        println("捕获异常:$t")
    }


    delay(1000)
    println("end")
}

输出:

async a coroutine
捕获异常:java.lang.Exception
end

可以看出CoroutineExceptionHandler并没有起作用,异常只会在deferred.await()抛出,同时只能在此捕获,原因就是协程作用域直接管理async,符合第一条

去掉try-catch,验证一下:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val deferred = scope.async {
        async {
            launch {
                println("async a coroutine")
                throw Exception()
            }
        }
    }
    
    delay(1000)
    println("end")
}

输出:

async a coroutine
end

此时并没有抛出异常

将最外层改为launch

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val job = scope.launch {
        async {
            launch {
                println("async a coroutine")
                throw Exception()
            }
        }
    }

    delay(1000)
    println("end")
}

输出:

async a coroutine
CoroutineExceptionHandler catch java.lang.Exception
end

可以看出抛出了异常,而且被CoroutineExceptionHandler捕获

我们说过,要把async下面的看成一个整体,可以验证一下:

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val deferred = scope.async {
        async {
            val deferred2 = async {
                println("async a coroutine")
                throw Exception()
            }
            println("async in await")
            deferred2.await()  //去掉这条代码,也一样不会抛出异常
            println("async in end") //不会执行,因为执行前出现异常
        }
        println("async out delay start")
        delay(1000)
        println("async out end")  //不会执行,因为执行前出现异常
    }

    delay(1000)
    println("end")
}

输出:

async out delay start
async in await
async a coroutine
end

可以看出并没有抛出异常,因为最终要交给最外层的async来处理,里面的子协程自行处理并没有用。

但是要注意一点,虽然没有抛出异常,但是异常发生后,async里面异常发生点后面的代码是不会执行的

给最外层asyncawait

fun main() = runBlocking {

    val handleException = CoroutineExceptionHandler { _, throwable ->
        println("CoroutineExceptionHandler catch $throwable")
    }

    val scope = CoroutineScope(Job() + handleException )

    val deferred = scope.async {
        async {
            val deferred2 = async {
                println("async a coroutine")
                throw Exception()
            }
            println("async in await")
            deferred2.await()
            println("async in end")
        }
        println("async out delay start")
        delay(1000)
        println("async out end")
    }

    deferred.await() //加上try-catch后能捕捉并拦截异常

    delay(1000)
    println("end")
}

输出:

async out delay start
async in await
async a coroutine
Exception in thread "main" java.lang.Exception
  at com.test.project.newgb.bluetooth.utils.TestKt$main$1$deferred$1$1$deferred2$1.invokeSuspend(Test.kt:629)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  ......

这符合第一条

三、协程代码执行顺序

上面讲到异步代码能按顺序执行,同步代码又可以不按顺序执行,那协程代码的执行顺序到底是什么规律呢?这个问题也是我学习协程时最纳闷的一点,搞不清楚什么时候会顺序执行,什么时候又挂起当前的代码去执行其它代码;什么时候会等待函数执行完成,什么时候又不等待函数执行完成。

要掌握协程代码执行顺序的规律,必须要明白Suspend function

协程的启动模式

协程的四种启动模式虽说除了LAZY 之外,其它都是在创建时立即调度执行,但是协程内部代码的执行一般晚于其外部代码(多线程下可能会早于,取决于线程的调度),比如上面的例子中,test()函数创建了一个协程,其内部的代码执行是晚于println("ddd test end")这条的,协程内部的执行要等待线程调度的到来。LAZY 模式就更不用说了,需要显示调用才会开始执行内部逻辑

Suspend function

挂起函数能挂起协程并恢复,所以自然的可以影响程序执行顺序

awaitjoin函数

使用Defrred.await()Job.join()都可以使协程等待其它协程的执行结果,属于Suspend function的特例,好处是可以无缝衔接程序的并发执行
Defrred.await()能让调用await的协程挂起并等待Defrred对象所代表的协程执行完毕后立即恢复执行
Job.join()可以让子协程执行完毕后父协程才会执行完毕

也许我们用delay也能实现想要的顺序,但是却不能实现无缝衔接,举例:

private suspend fun intValue1(): Int {
    delay(1000)  //模拟多线程执行时间 1秒
    return 1
}

private suspend fun intValue2(): Int {
    delay(2000) //模拟多线程执行时间 2秒
    return 2
}

fun main() = runBlocking {

    val elapsedTime = measureTimeMillis {
        var value1 = 0
        var value2 = 0
        //下面两个协程也是在并发执行
        async { value1 = intValue1() }
        async { value2 = intValue2() }
        
        delay(3000)

        println("the result is ${value1 + value2}")
    }

    println("the elapsedTime is $elapsedTime")
}

输出:

the result is 3
the elapsedTime is 3012

上面代码中,我们大概知道两个协程执行的时间,所以等3秒后肯定能得到正确结果,2.5秒也可以,但是这种方式问题很大,因为这个等待的时间不好把握,等待时间过长效率不高,等待时间过短,有可能子协程还没出结果。如果采用Defrred.await(),就能完美解决,改一下:

private suspend fun intValue1(): Int {
    delay(1000)  //模拟多线程执行时间 1秒
    return 1
}

private suspend fun intValue2(): Int {
    delay(2000) //模拟多线程执行时间 2秒
    return 2
}

fun main() = runBlocking {

    val elapsedTime = measureTimeMillis {
        val value1 = async { intValue1() }
        val value2 = async { intValue2() }

        println("the result is ${value1.await() + value2.await()}")
    }

    println("the elapsedTime is $elapsedTime")
}

输出:

the result is 3
the elapsedTime is 2022

Job.join()的例子:

fun main() = runBlocking {

    //注意,GlobalScope.launch生成的协程并不是runBlocking的子协程
    GlobalScope.launch {
        launch {
            delay(2000)
            println("inner launch1")
        }

        launch {
            delay(1000)
            println("inner launch2")
        }

    }

    println("end")
}

输出:

end

加上delay

fun main() = runBlocking {
    //注意,GlobalScope.launch生成的协程并不是runBlocking的子协程
    GlobalScope.launch {
        launch {
            delay(2000)
            println("inner launch1")
        }

        launch {
            delay(1000)
            println("inner launch2")
        }

    }
    delay(3000)
    println("end")
}

输出:

inner launch2
inner launch1
end

上面虽然是我们要的结果,但是这个delay的时间不好把握,用join:

fun main() = runBlocking {

    val job = GlobalScope.launch {
        launch {
            delay(2000)
            println("inner launch1")
        }

        launch {
            delay(1000)
            println("inner launch2")
        }

    }

    job.join()
    
    println("end")
}

输出:

inner launch2
inner launch1
end

举例:

override fun onCreate(savedInstanceState: Bundle?) {
      ......
      test()
      println("test end")
}

fun test() {
    MainScope().launch {
        val job = GlobalScope.launch(Dispatchers.Main) {
            launch {
                println("inner launch1 start thread:${Thread.currentThread()}")
                var i = 0
                while (i++ <= 5){
                    println("inner launch1 print $i")
                    Thread.sleep(500)
                }
                println("inner launch1 end")
            }

            launch {
                println("inner launch2 start thread:${Thread.currentThread()}")
                var i = 0
                while (i++ <= 5){
                    println("inner launch2 print $i")
                    Thread.sleep(500)
                }
                println("inner launch2 end")
            }


            println("withContext creating")
            
            withContext(Dispatchers.IO) {
                println("withContext thread:${Thread.currentThread().name}")
            }

            println("out launch end thread:${Thread.currentThread().name}")
        }

        job.join()

        println("fun end")
    }
}

输出:

test end
withContext creating
withContext thread:DefaultDispatcher-worker-1
inner launch1 start thread:Thread[main,5,main]
inner launch1 print 1
inner launch1 print 2
inner launch1 print 3
inner launch1 print 4
inner launch1 print 5
inner launch1 print 6
inner launch1 end
inner launch2 start thread:Thread[main,5,main]
inner launch2 print 1
inner launch2 print 2
inner launch2 print 3
inner launch2 print 4
inner launch2 print 5
inner launch2 print 6
inner launch2 end
out launch end thread:main
fun end

父协程在执行挂起函数withContext时,子协程开始运行,如果我们将withContext也改在Main线程

 withContext(Dispatchers.IO) {
     println("withContext thread:${Thread.currentThread().name}")
 }
 改为
 withContext(Dispatchers.Main) {
     println("withContext thread:${Thread.currentThread().name}")
 }

结果为:

test end
withContext creating
withContext thread:main
out launch end thread:main
inner launch1 start thread:Thread[main,5,main]
inner launch1 print 1
inner launch1 print 2
inner launch1 print 3
inner launch1 print 4
inner launch1 print 5
inner launch1 print 6
inner launch1 end
inner launch2 start thread:Thread[main,5,main]
inner launch2 print 1
inner launch2 print 2
inner launch2 print 3
inner launch2 print 4
inner launch2 print 5
inner launch2 print 6
inner launch2 end
fun end

从结果能看到withContext虽然是挂起函数,但是其里面的执行线程没有变化,并没有起到挂起的作用

改为多线程:

fun test() {
    MainScope().launch {
        val job = GlobalScope.launch {
            launch {
                println("inner launch1 start thread:${Thread.currentThread()}")
                var i = 0
                while (i++ <= 5){
                    println("inner launch1 print $i")
                    Thread.sleep(500)
                }
                println("inner launch1 end")
            }

            launch {
                println("inner launch2 start thread:${Thread.currentThread()}")
                var i = 0
                while (i++ <= 5){
                    println("inner launch2 print $i")
                    Thread.sleep(500)
                }
                println("inner launch2 end")
            }

            println("withContext creating")
            withContext(Dispatchers.Main) {
                println("withContext thread:${Thread.currentThread().name}")
            }

            println("out launch end thread:${Thread.currentThread().name}")
        }

        job.join()

        println("fun end")
    }
}

输出:

test end
inner launch1 start thread:Thread[DefaultDispatcher-worker-1,5,main]
inner launch1 print 1
inner launch2 start thread:Thread[DefaultDispatcher-worker-3,5,main]
inner launch2 print 1
withContext creating
withContext thread:main
out launch end thread:DefaultDispatcher-worker-4
inner launch1 print 2
inner launch2 print 2
inner launch1 print 3
inner launch2 print 3
inner launch1 print 4
inner launch2 print 4
inner launch1 print 5
inner launch2 print 5
inner launch1 print 6
inner launch2 print 6
inner launch1 end
inner launch2 end
fun end

可以看出,父子协程运行就是一个多线程并发的方式,如果不考虑子协程,父协程里的代码执行就是常规的顺序执行

四、协程核心概念

“Active” 状态下,一个 Job 正在运行并执行它的工作。如果 Job 是通过协程构建器创建的,这个状态就是协程主体运行时的状态。在这种状态下,我们可以启动子协程。大多数协程会在 “Active” 状态下启动。只有那些延迟启动的才会以 “New” 状态启动。当它完成时候,它的状态变为 “Completing”,等待所有子协程完成。一旦它的所有子协程任务都完成了,其状态就会变为 “Completed”,这是一个最终状态。或者,如果 Job 在运行时候(在 “Active” 或者 “Completing” 状态下)取消或失败,其状态将会改变成为 “Cancelling”。在这种状态下,最后还可以做一些清理工作,比如关闭连接或释放资源。完成此操作后, Job 将会进入到 “Cancelled” 状态。

Job存在父子关系,比如:

val grandfatherJob = SupervisorJob()
//创建一个Job,
val job = GlobalScope.launch(grandfatherJob) {
    //启动一个子协程
    val childJob = launch {
    }
}

上面的代码中有三个JobgrandfatherJob、job、childJob,其中job父亲为grandfatherJobchildJob父亲为job

增加打印语句,来印证一下:

fun main() = runBlocking{
    val grandfatherJob = SupervisorJob()
    //创建一个Job,
    val job = GlobalScope.launch(grandfatherJob) {
        println("job start")
        //启动一个子协程
        val childJob = launch {
            println("childJob start")
        }
        println("job end")
    }

    println("job's child is ${job.children.elementAtOrNull(0)}")

    println("grandfatherJob's child is ${grandfatherJob.children.elementAtOrNull(0)}")

    println("end")
}

输出:

job start
job end
childJob start
job's child is null
grandfatherJob's child is null
end

上面不是说:job父亲为grandfatherJobchildJob父亲为job,为什么打印出来jobgrandfatherJob的子协程都为空呢?

主要是如果子协程如果执行完了,会自动从children这个Sequence中清除掉,如果我们在打印child时,让子协程还在运行中:

fun main() = runBlocking{
    val grandfatherJob = SupervisorJob()
    //创建一个Job,
    val job = GlobalScope.launch(grandfatherJob) {
        println("job start")
        //启动一个子协程
        val childJob = launch {
            println("childJob start")
            delay(1000)  //延迟1秒
        }

        delay(2000) //延迟2秒
        println("job end")
    }

    println("job's child is ${job.children.elementAtOrNull(0)}")

    println("grandfatherJob's child is ${grandfatherJob.children.elementAtOrNull(0)}")

    println("end")
}

结果如下:

job start
childJob start
job's child is StandaloneCoroutine{Active}@59e5ddf
grandfatherJob's child is StandaloneCoroutine{Active}@536aaa8d
end

运行结果与预想一致。

Job的父子关系如何建立:

协程构建器基于其父 Job 构建其 Job

每个协程构建器都会创建其它们自己的 Job,大多数协程构建器会返回 Job

Job 是唯一一个不是子协程直接继承父协程的上下文(上下文即CoroutineContextJob也是继承自CoroutineContext)。每个协程都会创建自己的 Job,来自传递参数或者父协程的 Job 将会被用作这个子协程所创建 Job 的父 Job,比如:

fun main() = runBlocking {
    val name = CoroutineName("Some name")
    val job = Job()

    launch(name + job) {
        val childName = coroutineContext[CoroutineName]
        println(childName == name) // true
        //childJob是在launch中新建的Job,但其与”val job = Job()“中的job保持着父子关系
        val childJob = coroutineContext[Job] 
        println(childJob == job) // false
        println(childJob == job.children.first()) // true
    }
}

如果新的 Job 上下文取代了父 Job 的上下文,结构化并发机制将不起作用,比如:

fun main(): Unit = runBlocking {
    launch(Job()) { // 使用新 Job 取代了来自父协程的 Job
        delay(1000)
        println("Will not be printed")
    }
}
// (不会打印任何东西,程序会马上结束))

在上面的例子中,父协程将不会等待子协程,因为它与子协程没有建立关系,因为子协程使用来自参数的 Job 作为父 Job,因此它与 runBlockingJob 没有关系。

下面再用两段程序作说明:

private fun test1() {
    //总共有5个Job:SupervisorJob、newJob、Job0、Job1、Job2
    val scope = MainScope() //SupervisorJob(无子Job)
    
    //Job()会生成newJob,scope.launch会生成Job0,而Job0的父Job是newJob,Job0的子Job是Job1、Job2
    scope.launch(Job()) {  //此处使用新 Job 取代了来自父协程的 Job
        launch { //Job1
            delay(2000L)
            println("CancelJobActivity job1 finished")
            scope.cancel()
        }
        launch { //Job2
            delay(3000L)
            println("CancelJobActivity job2 finished") //会输出
        }
    }
}

private fun test2() {
    //总共有4个Job:SupervisorJob、Job0、Job1、Job2
    val scope = MainScope()//SupervisorJob(子Job为Job0)
    scope.launch { //Job0(子Job为Job1、Job2)
        launch { //Job1
            delay(2000L)
            println("CancelJobActivity job1 finished")
            scope.cancel()

        }
        launch { //Job2
            delay(3000L)
            println("CancelJobActivity job2 finished") //不会输出
        }
    }
}

Job 使用join 方法用来等待,直到所有协程完成。这是一个挂起函数,它挂起直到每个具体的子 Job 达到最终状态(Completed 或者 Cancelled)。比如:

fun main(): Unit = runBlocking {
    val job1 = launch {
        delay(1000)
        println("Test1")
    }
    val job2 = launch {
        delay(2000)
        println("Test2")
    }
    job1.join()
    job2.join()
    println("All tests are done")
}

输出:

Test1
Test2
All tests are done

上面例子中,可以看到Job 接口还暴露了一个 children 属性,允许我们访问它的所有子 job,比如:

fun main(): Unit = runBlocking {
    launch {
        delay(1000)
        println("Test1")
    }
    launch {
        delay(2000)
        println("Test2")
    }
    
    val children = coroutineContext[Job]
        ?.children
    val childrenNum = children?.count()
    println("Number of children: $childrenNum")
    children?.forEach { it.join() }
    println("All tests are done")
}

输出:

Number of children: 2
Test1
Test2
All tests are done

理解:join()调用在哪个协程之中,则这个协程的结束需要等待调用join函数的Job结束。上面的例子,join()runBlocking之中被调用,所以runBlocking要结束,需要等待job1、job2先结束。

五、协程原理

一个问题:能不能实现一个java库,实现类似kotlin协程的功能?

我认为理论上是可以的,但肯定没协程这般优雅。因为需要用同步的方式写出异步执行的代码,所以代码肯定需要在编译前进行处理,可以参考一下Butterknife中用到的APT(注解处理器),APT能根据注解自动生成代码。掌握了协程的原理,能更好的回答这个问题。

看看函数体的转变:

suspend fun getUserInfo(): String { 
    delay(5000L)
    
    return "BoyCoder"
}

变为:

public static final Object getUserInfo(@NotNull Continuation var0) {
    Object $continuation;
    label20: {
         if (var0 instanceof GetUserInfoMachine) {
            $continuation = (GetUserInfoMachine)var0;
            if (($continuation).label & Integer.MIN_VALUE) != 0) {
               $continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new GetUserInfoMachine(var0);    
    }
    
    Object $result = $continuation).result; 
    Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch($continuation).label) {   
      case 0:
         ResultKt.throwOnFailure($result);
         $continuation.label = 1;
         if (DelayKt.delay(5000L, (Continuation)$continuation) == var3) {
            return var3;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");  
    }
     
    return "BoyCoder";
}

static final class GetUserInfoMachine extends ContinuationImpl {
    Object result;
    int label;

    GetUserInfoMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return TestKt.getUserInfo(null, (Continuation<? super String>) this);
    }
}

GetUserInfoMachine的继承关系如下:

GetUserInfoMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // 在每个恢复的continuation进行调试探测,使得调试库可以精确跟踪挂起的调用栈中哪些部分
            // 已经恢复了。
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }    
    
    ...
}

delay的定义:

public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
    if (timeMillis <= 0L) {
        return Unit.INSTANCE;
    } else {
        // 实现类
        CancellableContinuationImpl cancellableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
        cancellableContinuationImpl.initCancellability();
        // 向上转型
        CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
        if (timeMillis < Long.MAX_VALUE) {
            // 延时操作
            getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
        }
      // 获取执行结果
        Object result = cancellableContinuationImpl.getResult();
        if (result == COROUTINE_SUSPENDED) {
            DebugProbesKt.probeCoroutineSuspended($completion);
        }
      // 返回结果
        return result;
    }
}

getUserInfo函数执行的过程:

1.调用getUserInfo函数,传入var0(即自动产生的续体对象,Continuation类型),其不为GetUserInfoMachine类型,所以new了一个GetUserInfoMachine对象,并将var0保存到GetUserInfoMachine对象中,同时将GetUserInfoMachine对象赋给$continuation变量

2.由于$continuation.label = 0,执行case 0分支

3.case 0分支中将$continuation.label置为1,调用DelayKt.delay方法

4.执行delay方法,$continuation传入到delay中(保存在变量$completion中,协程恢复时会用到),delay返回COROUTINE_SUSPENDED,表示挂起

5.case 0中,直接return 结果 ,最后getUserInfo函数返回COROUTINE_SUSPENDED

6.因为getUserInfo函数已返回COROUTINE_SUSPENDEDgetUserInfo函数暂时执行完毕,线程执行其它动作(通过暂时结束方法调用的方式,让协程暂时不在这个线程上面执行,线程可以去处理其它的任务,协程的挂起就不会阻塞当前的线程,这就是为什么协程能非阻塞式挂起)

7.目前getUserInfo函数所在的协程处于挂起状态,而delay函数会在某个子线程执行等待操作(这也是为什么我们的suspend函数一定要调用系统的suspend函数的原因,系统的函数才有这个能力),等延时时间到达之后,就会调用传给delay函数的$completionresumeWith方法,也就是调用GetUserInfoMachineresumeWith方法,即BaseContinuationImplresumeWith方法,来进行协程的恢复

8.BaseContinuationImplresumeWith方法会调用到GetUserInfoMachine对象的invokeSuspend方法

9.invokeSuspend方法中,又开始调用getUserInfo函数,传入var0参数,此时var0为之前创建的GetUserInfoMachine对象

10.由于$continuation.label = 1,执行case 1分支

11.最后getUserInfo函数执行结束并返回了"BoyCoder"

12.此时回到第8步的BaseContinuationImplresumeWith方法中,invokeSuspend执行的结果即是第11步返回的"BoyCoder",保存到了outcome变量中

13.resumeWith方法接着执行Result.success(outcome),并将结果保存到外部的val outcome: Result<Any?> ,这个变量中

14.completion变量(此变量就是第1步中传入的var0)此时不为BaseContinuationImpl,最后会执行completion.resumeWith(outcome),表示结束,我们可以看看源码里的注释:// top-level completion reached -- invoke and return翻译过来就是:达到顶级完成--调用并返回

说明一下,如果getUserInfo函数调用的不是delay,而是另一个有返回值的suspend函数,就会执行if (completion is BaseContinuationImpl) {}里的代码,形成一种递归调用

另外我们说过挂起函数挂起的是整个协程,不是挂起函数本身,所以第6步里getUserInfo函数返回COROUTINE_SUSPENDED后,协程里面,getUserInfo挂起函数后面的代码暂时是不会执行的,协程本身也是一个状态机,整个协程也暂时会返回COROUTINE_SUSPENDED,所以协程才会因为挂起函数而挂起

上一篇下一篇

猜你喜欢

热点阅读