【Kotlin回顾】19.Kotlin协程—CoroutineS

2022-12-09  本文已影响0人  代码我写的怎么

1.CoroutineScope

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

fun coroutineScopeTest() {
    val scope = CoroutineScope(Job())
    scope.launch {

    }

    scope.async {

    }
}

asynclaunch的扩展接收者都是CoroutineScope,这就意味着他们等价于CoroutineScope的成员方法,如果要调用就必须先获取到CoroutineScope的对象。

public interface CoroutineScope {

    /**
     * 此作用域的上下文
     * Context被作用域封装,用于实现作为作用域扩展的协程构建器
     * 不建议在普通代码中访问此属性,除非访问[Job]实例以获得高级用法
     */
    public val coroutineContext: CoroutineContext
}

CoroutineScope是一个接口,这个接口所做的也只是对CoroutineContext做了一层封装而已。CoroutineScope最大的作用就是可以方便的批量的控制协程,例如结构化并发。

2.CoroutineScope与结构化并发

fun coroutineScopeTest() {
    val scope = CoroutineScope(Job())
    scope.launch {
        launch {
            delay(1000000L)
            logX("ChildLaunch 1")
        }
        logX("Hello 1")
        delay(1000000L)
        logX("Launch 1")
    }

    scope.launch {
        launch {
            delay(1000000L)
            logX("ChildLaunch 2")
        }
        logX("Hello 2")
        delay(1000000L)
        logX("Launch 2")
    }

    Thread.sleep(1000L)
    scope.cancel()
}

//输出结果:
//================================
//Hello 2 
//Thread:DefaultDispatcher-worker-2
//================================
//================================
//Hello 1 
//Thread:DefaultDispatcher-worker-1
//================================

上面的代码实现了结构化,只是创建了CoroutineScope(Job())和利用launch启动了几个协程就实现了结构化,结构如图所示,那么它的父子结构是如何建立的?

3.父子关系是如何建立的

这里要先说明一下为什么CoroutineScope是一个接口,可是在创建的时候却可以以构造函数的方式使用。在Kotlin中的命名规则是以【驼峰法】为主的,在特殊情况下是可以打破这个规则的,CoroutineScope就是一个特殊的情况,它是一个顶层函数但它发挥的作用却是构造函数,同样的还有Job(),它也是顶层函数,在Kotlin中当顶层函数被用作构造函数的时候首字母都是大写的。

再来看一下CoroutineScope作为构造函数使用时的源码:

/**
 * 创建一个[CoroutineScope],包装给定的协程[context]。
 * 
 * 如果给定的[context]不包含[Job]元素,则创建一个默认的' Job() '。
 * 
 * 这样,任何子协程在这个范围或[取消][协程]失败。就像在[coroutineScope]块中一样,
 * 作用域本身会取消作用域的所有子作用域。
 */
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

构造函数的CoroutineScope传入一个参数,这个参数如果包含Job元素则直接使用,如果不包含Job则会创建一个新的Job,这就说明每一个coroutineScope对象中的 Context中必定会存在一个Job对象。而在创建一个CoroutineScope对象时这个Job()是一定要传入的,因为CoroutineScope就是通过这个Job()对象管理协程的。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

上面的代码是launch的源码,分析一下LazyStandaloneCoroutineStandaloneCoroutine

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
    private val continuation = block.createCoroutineUnintercepted(this, this)

    override fun onStart() {
        continuation.startCoroutineCancellable(this)
    }
}

StandaloneCoroutineAbstractCoroutine子类,AbstractCoroutine协程的抽象类, 里面的参数initParentJob = true表示协程创建之后需要初始化协程的父子关系。LazyStandaloneCoroutineStandaloneCoroutine的子类,active=false使命它是以懒加载的方式创建协程。

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    init {
        /**
         * 在上下文中的父协程和当前协程之间建立父子关系
         * 如果父协程已经被取消他可能导致当前协程也被取消
         * 如果协程从onCancelled或者onCancelling内部操作其状态,
         * 那么此时建立父子关系是危险的
         */
        if (initParentJob) initParentJob(parentContext[Job])
    }
}

AbstractCoroutine是一个抽象类他继承了JobSupport,而JobSupportJob的具体实现。

init函数中根据initParentJob判断是否建立父子关系,initParentJob的默认值是true因此if中的initParentJob()函数是一定会执行的,这里的parentContext[Job]取出的的Job就是在Demo中传入的Job

initParentJobJobSupport中的方法,因为AbstractCoroutine继承自JobSupport,所以进入JobSupport分析这个方法。

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    final override val key: CoroutineContext.Key<*> get() = Job

    /**
     * 初始化父类的Job
     * 在所有初始化之后最多调用一次
     */
    protected fun initParentJob(parent: Job?) {
        assert { parentHandle == null }
        //①
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        //②
        parent.start() // 确保父协程已经启动
        @Suppress("DEPRECATION")
        //③
        val handle = parent.attachChild(this)
        parentHandle = handle
        // 检查注册的状态
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle 
        }
    }

}

上面的源码initParentJob中添加了三处注释,现在分别对这三处注释进行分析:

用一句话来概括这个关系就是:每一个协程都有一个Job,每一个Job又有一个父Job和多个子Job,可以看做是一个树状结构。这个关系可以用下面这张图表示:

4.结构化是如何取消的

结构化可以被创建的同时CoroutineScope还提供了可取消的函数,Demo中通过scope.cancel()取消了协程,它的流程又是怎样的呢?先从scope.cancel中的cancel看起

/**
 * 取消这个scope,包含当前Job和子Job
 * 如果没有Job,可抛出异常IllegalStateException
 */
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

scope.cancel又是通过job.cancel取消的,这个cancel具体实现是在JobSupport

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    ...

    public override fun cancel(cause: CancellationException?) {
        cancelInternal(cause ?: defaultCancellationException())
    }

    public open fun cancelInternal(cause: Throwable) {
        cancelImpl(cause)
    }

    /**
     * 当cancelChild被调用的时候cause是Throwable或者ParentJob
     * 如果异常已经被处理则返回true,否则返回false
     */
    internal fun cancelImpl(cause: Any?): Boolean {
        var finalState: Any? = COMPLETING_ALREADY
        if (onCancelComplete) {
            // 确保它正在完成,如果返回状态是 cancelMakeCompleting 说明它已经完成
            finalState = cancelMakeCompleting(cause)
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        if (finalState === COMPLETING_ALREADY) {
            //转换到取消状态,当完成时调用afterCompletion
            finalState = makeCancelling(cause)
        }
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                afterCompletion(finalState)
                true
            }
        }
    }

    /**
     * 如果没有需要协程体完成的任务返回true并立即进入完成状态等待子类完成
     * 这里代表的是当前Job是否有协程体需要执行
     */
    internal open val onCancelComplete: Boolean get() = false
}

job.cancel最终调用的是JobSupport中的cancelImpl。这里它分为两种情况,判断依据是onCancelComplete,代表的就是当前Job是否有协程体需要执行,如果没有则返回true。这里的Job是自己创建的且没有需要执行的协程代码因此返回结果是true,所以就执行cancelMakeCompleting表达式。

private fun cancelMakeCompleting(cause: Any?): Any? {
    loopOnState { state ->
        ...
        val finalState = tryMakeCompleting(state, proposedUpdate)
        if (finalState !== COMPLETING_RETRY) return finalState
    }
}

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    ...
    return tryMakeCompletingSlowPath(state, proposedUpdate)
}

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    //获取状态列表或提升为列表以正确操作子列表
    val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
    ...
    notifyRootCause?.let { notifyCancelling(list, it) }
    ...
    return finalizeFinishingState(finishing, proposedUpdate)
}

进入cancelMakeCompleting后经过多次流转最终会调用tryMakeCompletingSlowPath中的notifyCancelling,在这个函数中才是执行子Job和父Job取消的最终流程

private fun notifyCancelling(list: NodeList, cause: Throwable) {
    //首先取消子Job
    onCancelling(cause)
    //通知子Job
    notifyHandlers<JobCancellingNode>(list, cause)
    // 之后取消父Job
    cancelParent(cause) // 试探性取消——如果没有parent也没关系
}
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
    var exception: Throwable? = null
    list.forEach<T> { node ->
        try {
            node.invoke(cause)
        } catch (ex: Throwable) {
            exception?.apply { addSuppressedThrowable(ex) } ?: run {
                exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
            }
        }
    }
    exception?.let { handleOnCompletionException(it) }
}

notifyHandlers中的流程就是遍历当前Job的子Job,并将取消的cause传递过去,这里的invoke()最终会调用 ChildHandleNodeinvoke()方法

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    ...

    internal class ChildHandleNode(
        @JvmField val childJob: ChildJob
    ) : JobCancellingNode(), ChildHandle {
        override val parent: Job get() = job
        override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
        override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
    }

    public final override fun parentCancelled(parentJob: ParentJob) {
        cancelImpl(parentJob)
    }
}

childJob.parentCancelled(job)的调用最终调用的是JobSupport中的parentCanceled()函数,然后又回到了cancelImpl()中,也就是 Job 取消的入口函数。这实际上就相当于在做递归调用

Job取消完成后接着就是取消父Job了,进入到cancelParent()函数中

/**
 * 取消Job时调用的方法,以便可能将取消传播到父类。
 * 如果父协程负责处理异常,则返回' true ',否则返回' false '。
 */ 
private fun cancelParent(cause: Throwable): Boolean {
    // Is scoped coroutine -- don't propagate, will be rethrown
    if (isScopedCoroutine) return true

    /* 
    * CancellationException被认为是“正常的”,当子协程产生它时父协程通常不会被取消。
    * 这允许父协程取消它的子协程(通常情况下),而本身不会被取消,
    * 除非子协程在其完成期间崩溃并产生其他异常。
    */
    val isCancellation = cause is CancellationException
    val parent = parentHandle

    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }

    // 责任链模式
    return parent.childCancelled(cause) || isCancellation
}

/**
 * 在这个方法中,父类决定是否取消自己(例如在重大故障上)以及是否处理子类的异常。
 * 如果异常被处理,则返回' true ',否则返回' false '(调用者负责处理异常)
 */
public open fun childCancelled(cause: Throwable): Boolean {
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

cancelParent的返回结果使用了责任链模式, 如果返回【true】表示父协程处理了异常,返回【false】则表示父协程没有处理异常。

当异常是CancellationException时如果是子协程产生的父协程不会取消,或者说父协程会忽略子协程的取消异常,如果是其他异常父协程就会响应子协程的取消了。

5.总结

作者:无糖可乐爱好者
链接:https://juejin.cn/post/7175334344049819705

上一篇下一篇

猜你喜欢

热点阅读