精通AndroidKotlin-Coroutines

kotlin进阶—深入理解协程

2021-04-11  本文已影响0人  Peakmain

概念

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7'

语法

Delays coroutine for a given time without blocking a thread and resumes it after a specified time.

此功能实际只是延迟协程执行,而不会去阻塞线程

job生命周期

协程和线程

1、协程实现延迟

fun main() {
   GlobalScope.launch {
       delay(1000)
       println("Kotlin")
   }
    println("Hello,")
    Thread.sleep(2000)
    println("World!")
}
  • 1、整个main实际是一个线程
  • 2、delay不会阻塞线程,而是将协程延迟一秒,所以会向下执行线程中的代码打印hello
  • 3、Thread.sleep会阻塞线程,过一秒之后,由于协程延迟时间已过,则会执行协程向下打印,打印出kotlin
  • 4、2秒后,打印出World!

如果Thread.sleep(900)会打印什么?
我们会发现,只打印了Hello,World!。原因是0.9秒之后打印完World!后,线程已经结束,而协程是依附在线程上,所以线程结束,协程也已经结束

2、线程实现

fun main() {
    thread {
        Thread.sleep(1000)
        println("Kotlin")
    }
    println("Hello,")
    Thread.sleep(2000)
    println("World!")
}

运行结果后,我们会发现运行结果和代码一是一样的

如果此时将Thread.sleep(2000)变成Thread.sleep(900)会打印什么?
我们会发现,当我们打印出Hello,World!之后会继续打印Kotlin

3、runBlocking实现

fun main() {

    GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
    runBlocking {
        delay(2000)
    }
    println("World!")
}

上面代码还可以这么写

fun main()= runBlocking {

    GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")

    delay(2000)
    println("World!")
}

结果与上面两个方法一致

4、任务泄漏

CoroutinueScope

协程的启动和取消

启动和协程

launch源码

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

launch实际返回的job对象,而job中有个join方法。注意这里的block是suspend方法,而suspend方法只有suspend 函数或协程可被调用

join的源码

    /**
     * Suspends the coroutine until this job is complete. This invocation resumes normally (without exception)
     * when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
     * This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
     */
    public suspend fun join()

join的作用就是会挂起协程直到任务完成,随后会恢复正常

fun main() = runBlocking {

    val myJob = GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
    myJob.join()
    println("World!")

}

流程:首先会走launch,因为delay了1秒,所以走线程,打印Hello,此时继续向下执行遇到了join,join的作用是会等到launch方法执行完毕后,再向下执行

runBlocking+launch的使用

fun main() = runBlocking {

    launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
}

我们会发现:先打印Hello,后打印Kotlin,而不是像之前协程实现延迟一样,打印Hello就结束了。这是因为每一个协程构建器都会向其代码块作用域添加一个CoroutinueScope实例,我们可以在该 作用域启动协程,而无需显示将其join到一起,理由是外部协程(就是上面实例中的runBlocking)会等待所有启动协程全部完成后才会完成

而GlobalScope.launch会退出的原因是GlobalScope是守护线程

coroutineScope
等待所有子协程完成其任务,但并不会阻塞当前的线程

fun main()= runBlocking{
     launch {
         delay(1000)
         println("launch 1")
     }
    println("main 1")
    coroutineScope {
        launch {
            delay(3000)
            println("coroutineScope 1")
        }
        delay(500)
        println("coroutineScope 1")
    }
    println("main 2")
}

结果:
main 1
coroutineScope 1
launch 1
coroutineScope 1
main 2

coroutineScope是挂起函数,也就是说当前协程会被挂起,那么coroutineScope函数也会被挂起。此时它创建的对象runBlocking函数会继续执行之前的代码,但是下面的代码必须等待挂起函数coroutineScope执行完毕之后才会继续执行

coroutineScope和supervisorScope

协程的启动模式
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,//👈🏻协程的启动模式
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
挂起suspend
全局协程 GlobalScope

全局协程类似守护线程
使用 GlobalScope启动的活动协程并不会保证进程的生命, 会随着线程销毁而销毁

fun main(){
  GlobalScope.launch {
      repeat(100){
          println("111")
          delay(500)
      }
  }
  Thread.sleep(2000)
}
协程的取消和超时
fun main() = runBlocking{
   val currentTimeMillis = System.currentTimeMillis()
   val job = launch {
       var I=0
       var startTime=currentTimeMillis
       while (i<20) {
          if(System.currentTimeMillis()>=startTime){
              println("launch${i++}")
             startTime+=500L
          }
       }
   }
   delay(1300)
   println("main")
   job.cancelAndJoin()
   println("end")
}

结果:


image.png

上面的代码协程实际并没有执行取消

如何让计算代码变为可取消

fun main() = runBlocking{
    val currentTimeMillis = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var I=0
        var startTime=currentTimeMillis
        while (isActive) {
           if(System.currentTimeMillis()>=startTime){
               println("launch${i++}")
              startTime+=500L
           }
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

finally资源清理
join和cancelAndjoin都会等待清理动作完成才会继续往下执行

fun main() = runBlocking{
    val job = launch() {
        try {
       repeat(100){
           println("launch$it")
           delay(500)
       }
        }finally {
          println("执行清理动作")
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

finally之后表示协程已经真正结束被取消了,此时如果使用挂起函数,会导致CancellationException异常

fun main() = runBlocking {
    val job = launch() {
        try {
            repeat(100) {
                println("launch$it")
                delay(500)
            }
        } finally {
            println("执行清理动作")
            delay(1000)
            println("真的结束了吗")//并没有被打印
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

上面代码结果并没有打印"真的结束了吗",原因是:此时的协程已经被取消了。在少数情况下,当我们一个取消的协程中进行挂起函数,怎么办呢?可以使用withContext(NonCancellable) {}

fun main() = runBlocking {
    val job = launch() {
        try {
            repeat(100) {
                println("launch$it")
                delay(500)
            }
        } finally {
            withContext(NonCancellable) {
                println("执行清理动作")
                delay(1000)
                println("真的结束了吗")
            }
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

结果:


image.png

withTimeout和withTimeoutOrNull
withTimeout和withTimeoutOrNull都可以解决超时的问题,区别在于:withTimeoutOrNull在控制台不会抛出异常,而是直接返回null,但是和withTimeout会在控制台抛出异常

fun main() = runBlocking {
    withTimeout(1300) {
        repeat(100) {
            println("launch$it")
            delay(500)
        }
    }
}
fun main() = runBlocking {
    val result = withTimeoutOrNull(1300) {
        repeat(100) {
            println("launch$it")
            delay(500)
        }
    }
    println(result)
}
async/await:Deferred实现并发

用于执行协程任务,并得到执行的结果,结果值是Deferred,这是一个轻量级非阻塞的future,可以在稍后提供一个结果值,可以通过.await()方法获取最终的值

fun main() = runBlocking {
   val job= async {
        delay(1000)
        return@async "Kotlin"
    }
    println("Hello,")
    println(job.await())
    println("end")
}
fun main() = runBlocking {
    val time = measureTimeMillis {
        val a = async(start = CoroutineStart.LAZY) { A() }
        val b = async(start = CoroutineStart.LAZY) { B() }
        println("start")
        Thread.sleep(3000)
        a.start()
        b.start()
        println("${a.await()}+${b.await()}=${a.await() + b.await()}")
    }
    println("时间是$time")
}

private suspend fun A(): Int {
    delay(2000)
    return 2
}

private suspend fun B(): Int {
    delay(3000)
    return 4
}

注意:start是并行,如果没有start而是await则是串行

父子异常取消和异常
fun main() = runBlocking<Unit> {
    try {
        failureSum()
    } catch (e: Exception) {
        println("failureSum failure")
    }
}

private suspend fun failureSum(): Int = coroutineScope {
    val value = async {
        try {
            delay(2000)
            50
        } finally {
            println("value was cancel")
        }
    }
    val failure = async<Int> {
        Thread.sleep(1000)
        println("出现异常了")
        throw Exception()
    }
    value.await() + failure.await()
}

结果:

出现异常了
value was cancel
failureSum failure

Process finished with exit code 0

协程上下文和分发器

fun main() = runBlocking<Unit>{
     launch {
         println("no params,thread:${Thread.currentThread().name}")
     }
    launch(Dispatchers.Unconfined) {
        println("Dispatchers.Unconfined,thread:${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {
        println("Dispatchers.Default,thread:${Thread.currentThread().name}")
    }
    val thread = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(thread) {
        println("SingleThreadExecutor,thread:${Thread.currentThread().name}")
        thread.close()
    }
    GlobalScope.launch {
        println("GlobalScope.launch,thread:${Thread.currentThread().name}")
    }
}

程序分析

协程上下文继承

协程上下文=默认值+继承的CoroutineContext+参数

fun main() {
    runBlocking {
        val coroutineScope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("peakmain"))
        val launch = coroutineScope.launch(Dispatchers.IO) {
            println(Thread.currentThread().name)//👈🏻DefaultDispatcher-worker-1
        }
        launch.join()
    }
}

异常的传播

根协程的异常

fun main() {
    runBlocking {
        val job = GlobalScope.launch {//根协程
            println("Global的异常")
            throw NullPointerException()//控制台打印
        }
        job.join()
        println("joined failed job ")
        val async = GlobalScope.async {//根协程
            println("async 的异常")
            throw IndexOutOfBoundsException()//控制台不打印
        }
        try {
            async.await()
            println("exception")//不打印
        } catch (e: Exception) {
            println("IndexOutOfBoundsException")//打印
        }

    }
}

非根协程的异常

fun main() {
    runBlocking {

        val coroutineScope = CoroutineScope(Job())
        val launch = coroutineScope.launch {
            async {
                println("async 的异常")
                throw IndexOutOfBoundsException()//控制台不打印
            }
        }
        launch.join()
    }
}

如果async抛出异常,launch就会立即抛出异常,而不会等到调用.await

异常的传播性
当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给他的父级。接下来,父级会做以下几件事情:

SupervisorJob

fun main() {
    runBlocking {

        val job = CoroutineScope(SupervisorJob())
        val launch1 = job.launch {
           delay(100)
            println("launch1")
            throw NullPointerException("launch1 exception")
        }
        val launch2 = job.launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("launch2 finished")
            }
        }
        joinAll(launch1,launch2)
    }
}

CoroutineExceptionHandler

fun main() {
    runBlocking {
        val handler = CoroutineExceptionHandler { _, exception ->
            println(exception.message)
        }
        val coroutineScope = CoroutineScope(Job())
        val launch = coroutineScope.launch(handler) {
            launch {
                throw IllegalAccessError("error is null")
            }
        }
        launch.join()
    }
}

Android获取全局异常处理

com.peakmain.project.GlobalCoroutinueExceptionHandler
class GlobalCoroutinueExceptionHandler() :CoroutineExceptionHandler {
    override val key = CoroutineExceptionHandler
    override fun handleException(context: CoroutineContext, exception: Throwable) {
        Log.e("TAG","异常信息:${exception.message}")
    }
}

取消和异常

Flow-异步流

通过flow异步返回多个值

fun main() {
    runBlocking {
        simpleFlow().collect{
            println(it)
        }
    }
}
suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i)//发送,产生一个元素
    }
}

冷流/热流

流的连续性

fun main() {
    runBlocking {
        (1..100).asFlow().filter {
            it%2==0
        }.map {
            "偶数:$it"
        }.collect {
            println(it)
        }
    }
}

流构建器

       flowOf(1,2,3,4,5).onEach {
           delay(1000)
       }.collect {
           println(it)
       }

流上下文

fun main() {
    runBlocking {
        flow<Int> {
            println(Thread.currentThread().name)
            for (i in 1..10){
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .collect {
                println("$it=${Thread.currentThread().name}")//👈🏻下游的线程由整个flow运行的CoroutineContext决定
            }
    }
}

指定协程收集流

fun main() {
    runBlocking {
        flow<Int> {
            println("main:${Thread.currentThread().name}")
            for (i in 1..10){
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .onEach { println(Thread.currentThread().name) }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()

    }
}

flow取消

fun main() {
    runBlocking {
        val job = flow<Int> {
            println("main:${Thread.currentThread().name}")
            for (i in 1..10) {
                delay(1000)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .onEach { println(Thread.currentThread().name) }
            .launchIn(CoroutineScope(Dispatchers.IO))
        delay(2000)
        job.cancel()
        job.join()

    }
}

flow取消检测

fun main() {
    runBlocking {
        (1..10).asFlow().collect {
            println(it)
            if(it==4){
                cancel()
            }
        }
    }
}

结果:

1
2
3
4
5
6
7
8
9
10
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7e0b37bc

我们发现取消失效了

fun main() {
    runBlocking {
        (1..10).asFlow()
            .cancellable().collect {
                println(it)
                if (it == 4) {
                    cancel()
                }
            }
    }
}

通道

协程之间通信

fun main() {
    runBlocking {
        val channel= Channel<Int>()
        launch {
            var i=0
            while (true){
                delay(500)
                channel.send(++i)
                println("发送:$i")
            }
        }
        launch {
             while (true){
                 val element=channel.receive()
                 println("receive $element")
             }
        }

    }
}

容量

迭代

fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.UNLIMITED)//设置容量
        launch {
            for (x in 0..5) {
                channel.send(x)
                println("发送:$x")
            }
        }
        launch {
            val iterator = channel.iterator()
            while (iterator.hasNext()) {
                val element = channel.receive()
                println("receive $element")
                delay(1000)
            }
        }

    }
}

原理

挂起原理

suspend fun A():String{
   delay(2*1000)
   println("delay after")
   return "I am Peakmain"
}
//反编译之后的代码
public final class TestKt {
  @Nullable
// $completion实际是个回掉方法
  public static final Object A(@NotNull Continuation $completion) {
     Object $continuation;
     label20: {
        $continuation = new ContinuationImpl($completion) {
           Object result;
           int label;

           @Nullable
           public final Object invokeSuspend(@NotNull Object $result) {
              this.result = $result;
              this.label |= Integer.MIN_VALUE;
    //系统自己会调用一次自己
              return TestKt.A(this);
           }
        };
     }
     Object $result = ((<undefinedtype>)$continuation).result;
    // 这里的var5实际是COROUTINE_SUSPENDED
     Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
     switch(((<undefinedtype>)$continuation).label) {
     case 0:
        ResultKt.throwOnFailure($result);
    //label修改成1
        ((<undefinedtype>)$continuation).label = 1;
      //DelayKt.delay会回掉ContinuationImpl的invokeSuspend
        if (DelayKt.delay(2000L, (Continuation)$continuation) == COROUTINE_SUSPENDED) {
       //return之后delay方法就不会向下执行
           return var5;
        }
        break;
     case 1:
        ResultKt.throwOnFailure($result);
        break;
     default:
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
     }

     String var1 = "delay after";
     boolean var2 = false;
     System.out.println(var1);
     return "I am Peakmain";
  }
}

DelayKt.delay源码分析

public final class DelayKt {
  public static final Object delay(long timeMillis, @NotNull Continuation $completion) {//$completion回掉函数
     if (timeMillis <= 0L) {
        return Unit.INSTANCE;
     } else {
        int $i$f$suspendCancellableCoroutine = false;
        int var5 = false;
        CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
        cancellable$iv.initCancellability();
        CancellableContinuation cont = (CancellableContinuation)cancellable$iv;
        int var8 = false;
     //getDelay实际是DefaultExecutor
        getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
        Object var10000 = cancellable$iv.getResult();
        if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
           DebugProbesKt.probeCoroutineSuspended($completion);
        }
     //主要看返回结果就可以了
        return var10000;
     }
  }
}
   internal fun getResult(): Any? {
       installParentCancellationHandler()
       if (trySuspend()) return COROUTINE_SUSPENDED
        ...
       return getSuccessfulResult(state)
   }
   private fun trySuspend(): Boolean {
       _decision.loop { decision ->
           when (decision) {
            //默认是UNDECIDED
               UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
               RESUMED -> return false
               else -> error("Already suspended")
           }
       }
   }

我们从上面代码可以知道,挂起的本质实际是return,suspend的作用实际是添加一个回调

恢复
协程的核心本质是挂起-恢复,而挂起-恢复的本质实际是return+callback

回到DelayKt.delay中的 scheduleResumeAfterDelay源码
上面我们知道getDelay实际是DefaultExecutor,而DefaultExecutor继承于EventLoopImplBase

    public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timeNanos = delayToNanos(timeMillis)
        if (timeNanos < MAX_DELAY_NS) {
            val now = nanoTime()
            DelayedResumeTask(now + timeNanos, continuation).also { task ->
                continuation.disposeOnCancellation(task)
                schedule(now, task)
            }
        }
    }
    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
            SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
            SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
            else -> error("unexpected result")
        }
    }
   private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
        if (isCompleted) return SCHEDULE_COMPLETED
        val delayedQueue = _delayed.value ?: run {
            _delayed.compareAndSet(null, DelayedTaskQueue(now))
            _delayed.value!!
        }
        return delayedTask.scheduleTask(now, delayedQueue, this)
    }
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
            if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
            delayed.addLastIf(this) { firstTask ->
                if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
                if (firstTask == null) {
                    delayed.timeNow = now
                } else {
          
                    if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
                }
      
                if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
                true
            }
            return SCHEDULE_OK
        }

scheduleTask的作用将对象DelayedTask添加到队列中,并返回SCHEDULE_OK,回到schedule源码中

    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            //上面代码分析此时实际回到的是这里
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        }
    }
    protected actual fun unpark() {
        val thread = thread // atomic read
        if (Thread.currentThread() !== thread)
            unpark(thread)
    }
//thread实际是DefaultExector的
    override val thread: Thread
        get() = _thread ?: createThreadSync()
    @Synchronized
    private fun createThreadSync(): Thread {
        return _thread ?: Thread(this, THREAD_NAME).apply {
            _thread = this
            isDaemon = true
            start()
        }
    }
  const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"

upark实际最终走到线程的unpark源码

@InlineOnly
internal inline fun unpark(thread: Thread) {
    timeSource?.unpark(thread) ?: LockSupport.unpark(thread)
}

也就是说最终实际调用LockSupport.unpark去唤醒线程,而我们又知道Thread实际是DefaultExecutor,而DefaultExecutor实现了Runnable接口

    override fun run() {
        ThreadLocalEventLoop.setEventLoop(this)
        registerTimeLoopThread()
        try {
            var shutdownNanos = Long.MAX_VALUE
            if (!notifyStartup()) return
            while (true) {
                //是否可被打断
                Thread.interrupted() 
                var parkNanos = processNextEvent()
                if (parkNanos > 0) {
                 
                    if (isShutdownRequested) return
                //实际走到的是线程的LockSupport. parkNanos
                    parkNanos(this, parkNanos)
                }
            }
        } finally {
            _thread = null // this thread is dead
            acknowledgeShutdownIfNeeded()
            unregisterTimeLoopThread()
            // recheck if queues are empty after _thread reference was set to null (!!!)
            if (!isEmpty) thread // recreate thread if it is needed
        }
    }

实际是就是LockSupport的park和unpack,回到scheduleResumeAfterDelay的DelayedResumeTask的run方法

    private inner class DelayedResumeTask(
        nanoTime: Long,
        private val cont: CancellableContinuation<Unit>
    ) : DelayedTask(nanoTime) {
     //cont是我们的回调
        override fun run() { with(cont) { resumeUndispatched(Unit) } }
        override fun toString(): String = super.toString() + cont.toString()
    }
    override fun CoroutineDispatcher.resumeUndispatched(value: T) {
        val dc = delegate as? DispatchedContinuation
        resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
    }
    private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
        _state.loop { state ->
            when (state) {
                is NotCompleted -> {
                    if (!_state.compareAndSet(state, proposedUpdate)) return@loop
                    dispatchResume(resumeMode)
                    return null
                }
           
        }
    }
    private fun dispatchResume(mode: Int) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        dispatch(mode)
    }
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
    val delegate = this.delegate
    if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
    //......
    } else {
        resume(delegate, mode)
    }
}
 delegate.resumeMode(getSuccessfulResult(state), useMode)
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
    when (mode) {
        MODE_ATOMIC_DEFAULT -> resume(value)
        MODE_CANCELLABLE -> resumeCancellable(value)
        MODE_DIRECT -> resumeDirect(value)
        MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
        MODE_IGNORE -> {}
        else -> error("Invalid mode $mode")
    }
}
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

实际会走到Continuation.resumeWith方法,而它的实现类实际是BaseContinuationImpl的.resumeWith方法

internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                     //走到我们实现类的ContinuationImpl的invokeSuspend
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminatin
            }
        }
    }

总结:
1、suspend实际是编译后添加一个Continuation的回掉
2、内部会创建一个ContinuationImpl方法并实现invokeSuspend方法
3、挂起实际是return了
4、恢复最终实际会调用ContinuationImpl. invokeSuspend方法,之后再次调用自己创建的类

恢复原理逆向还原

ublic class Test {

    public static final Object A(Continuation continuation) {
        ContinuationImpl callback;
        if(!(continuation instanceof ContinuationImpl)&&(((ContinuationImpl) continuation).lable==0)){
            callback = new ContinuationImpl(continuation) {
                @Override
                Object invokeSuspend(@NotNull Object result) {
                    this.result = result;
                    this.lable |= Integer.MIN_VALUE;
                    return A(this);
                }
            };
        }else{
            callback= (ContinuationImpl) continuation ;
        }
        switch (callback.lable){
            case 0:
                Object delay = DelayKt.delay(2000, callback);
                if(delay== IntrinsicsKt.getCOROUTINE_SUSPENDED()){
                    return IntrinsicsKt.getCOROUTINE_SUSPENDED();
                }
                break;
        }
        String var1 = "delay after";
        System.out.println(var1);
        return "I am peakmain";
    }

    abstract static class ContinuationImpl<T> implements Continuation<T> {
        private Continuation mCallback;
        int lable;
        Object result;

        public ContinuationImpl(Continuation callback) {

            mCallback = callback;
        }

        @NotNull
        @Override
        public CoroutineContext getContext() {
            return mCallback.getContext();
        }

        @Override
        public void resumeWith(@NotNull Object result) {
            Object suspend = invokeSuspend(result);
            if(suspend==IntrinsicsKt.getCOROUTINE_SUSPENDED())return;
            mCallback.resumeWith(result);

        }

        abstract Object invokeSuspend(@NotNull Object result);
    }
}

协程的挂起恢复本质是return+callback

上一篇 下一篇

猜你喜欢

热点阅读