kotlin知识

Kotlin协程系列之基础设施

2022-09-22  本文已影响0人  _Jun

前一篇文章介绍了kotlin协程的历史及现状,接下来就介绍一下kotlin协程吧。

kotlin协程分为标准库提供的基础设施以及官方协程库两部分,标准库的基础设施依照协程的思想提供基本的挂起、恢复接口,官方协程库基于这些接口提供丰富实用的功能。

今天先介绍一下kotlin协程的基础设施。为了让大家对kotlin协程有更为直观的认识,先展示一个demo:

// 代码清单2-1
val continuation = suspend {
    println("simpleTest: In Coroutine")
    5
}.createCoroutine(object : Continuation<Int> {
    override fun resumeWith(result: Result<Int>) {
        println("resumeWith: Continuation End: $result")
    }

    override val context: CoroutineContext
    get() = EmptyCoroutineContext

})
continuation.resume(Unit)

demo由三个部分组成:

suspend函数作为协程体,输出一条日志,并且返回5作为执行结果。协程体执行完成后,回调Continuation的resumeWith方法,其内输出协程的执行结果。

suspend函数

suspend函数能在不阻塞线程执行的情况下将协程挂起,适合用于将异步代码转为同步形式。在suspend函数中可以调用普通函数和suspend函数,然而在普通函数中却不能调用suspend函数(由于编译时存在CPS转换,后续讲协程实现时详细说明)。

当suspend函数被真正挂起的时候,对应的调用处被乘坐挂起点

suspend函数内部通过调用suspendCoroutine方法实现挂起操作的,如果没有调用suspendCoroutine,那么suspend函数并不会真正的挂起,前面的demo在实际运行时就不会挂起。

suspendCoroutine的函数签名如下:

// 代码清单2-2
suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

它是个内联函数,其中block是参数为Continuation的回调函数,在异步任务完成时,早block内部调用continuation.resumeWith()或者调用continuation.resume()continuation.resumeWithException()恢复被挂起的协程。

下面看一个suspend函数的例子:

// 代码清单2-3
suspend fun suspendFun1() {
    println("suspendFun1")
}

suspend fun suspendFun2() = suspendCoroutine<Int> {
    thread {
        sleep(50)
        it.resume(65)
    }
}

fun sleep(time:Long) {
    try {
        Thread.sleep(time)
    } catch (e: InterruptedException) {
        e.printStackTrace()
    }
}

suspend {
    suspendFun1()
    val res2 = suspendFun2()
    println("suspendFun2 result $res2")
    res2 + 3
}.startCoroutine(object : Continuation<Int> {
    override val context: CoroutineContext
    get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<Int>) {
        println("continuationInterceptorTest resume $result")
    }

})

suspendFun1不会真正执行挂起逻辑,而suspendFun2通过suspendCoroutine创建一个真正的挂起函数,并在内部开启子线程模拟耗时任务执行。所以在执行到suspendFun2时,当前协程先被挂起,等待模拟耗时操作完成后,通过continuation的resume方法恢复被挂起的协程。

构建协程

在代码清单2-1中,我们使用createCoroutinecontinuation.resume创建并执行协程,当然我们还可以使用startCoroutine直接创建并启动协程:

// 代码清单2-4
suspend {
    println("simpleTest: In Coroutine with start")
    6
}.startCoroutine(object : Continuation<Int> {
    override val context: CoroutineContext
    get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<Int>) {
        println("resumeWith: Continuation with start End: $result")
        println()
    }

})

可以看出startCoroutine用法和createCoroutine类似,下面就看看它们的函数签名吧:

// 代码清单2-5
fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit>

fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
)

startCoroutinecreateCoroutine都是作为 (suspend () -> T)函数类型的扩展函数定义的,因此在代码清单2-1和2-4中可以直接在suspend函数后直接调用,并且它们都接收一个Continuation类型实例作为回调,当协程执行完成后,会调用completion的resumeWith函数,并将协程执行结果通过result参数传递。

标准库还定义了带Receiver的startCoroutinecreateCoroutine,用于将协程体的作用域设置成Receiver:

// 代码清单2-6
fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
)

fun <R, T> (suspend R.() -> T).startCoroutine(
    receiver: R,
    completion: Continuation<T>
)

Continuation

startCoroutinecreateCoroutine创建协程时都用到Continuation对象,它用来记录被挂起的协程在挂起点的状态,其内保存着挂起点之后要执行的代码。考虑如下序列生成器:

// 代码清单2-7
sequence {
    for (i in 1..10) yield(i * i)
    println("over")
}  

该序列生成器在for循环内调用yield生成新的序列,并将当前协程挂起,所以在yield内部一定会有Continuation记录剩余要执行的代码,并且有十个Continuation对象:第一次执行i=2和循环并挂起,第二次执行i=3并挂起,以此逻辑依次执行。当协程被创建,但还没开始运行时,即调用了createCoroutine后未调用resume,此时存在一个初始的Continuation<Unit>表示所有的协程代码。

下面我们看看Continuation的定义:

// 代码清单2-8
interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)
}

context表示当前协程的上下文,用户可以根据需求自定义上下文,稍后会详细介绍。在前面的demo中,我们都是直接使用EmptyCoroutineContext,这是一个自带的上下文,无特殊需求时可以用它。

resumeWith函数是协程的完成回调,不论协程执行成功或失败,都通过此函数通知用户。为了方便使用,kotlin提供了两个扩展函数:

// 代码清单2-9
fun <T> Continuation<T>.resume(value: T)
fun <T> Continuation<T>.resumeWithException(exception: Throwable)

resume作为成功通知,resumeWithException作为失败通知。

上下文

协程上下文类似Set集合,用于保存于协程关联的自定义数据:可以包含协程的线程策略、日志信息、协程安全和事务相关信息、协程id及名字等等。可以将协程当做轻量级线程,那么协程上下文就类似线程的ThreadLocal变量,不过ThreadLocal是可变的,而协程上下文是不可变的。

协程上下文在kotlin中用CoroutineContext表示,这是一个集合,下面是其在标准库中的定义:

// 代码清单2-10
interface CoroutineContext {
    operator fun <E : Element> get(key: Key<E>): E?
    fun <R> fold(initial: R, operation: (R, Element) -> R): R
    operator fun plus(context: CoroutineContext): CoroutineContext
    fun minusKey(key: Key<*>): CoroutineContext

    interface Element : CoroutineContext {
        val key: Key<*>
    }

    interface Key<E : Element>
}

从定义中可以看出,CoroutineContext由Element组成,Element仅包含字段key,明显key是作为Element的索引。

Key接口通过泛型将Key与Element关联起来,可以看做Key即为Element本身,这与Set类似。然而CoroutineContext重载了操作符[],即代码清单2-10中的get函数,它接收Key类型索引,而返回Element类型的元素,从这方面看,CoroutineContext又喝Map相似。

foldplusminusKey属于集合操作,这里就不详细介绍了,请自行查阅文档。

总之,CoroutineContext是一种混合了Set和Map结构的新型集合。

EmptyCoroutineContext

EmptyCoroutineContext 是标准库提供的一个不包含任何数据的CoroutineContext实例,在没有特殊需求时可以使用此实例,其定义如下:

// 代码清单2-11
public object EmptyCoroutineContext : CoroutineContext, Serializable {
    private const val serialVersionUID: Long = 0
    private fun readResolve(): Any = EmptyCoroutineContext

    public override fun <E : Element> get(key: Key<E>): E? = null
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
    public override fun plus(context: CoroutineContext): CoroutineContext = context
    public override fun minusKey(key: Key<*>): CoroutineContext = this
    public override fun hashCode(): Int = 0
    public override fun toString(): String = "EmptyCoroutineContext"
}

实现

在实现自定义上下文时,不能直接实现CoroutineContext接口,标准库提供了AbstractCoroutineContextElement,应该实现此类,其定义如下:

// 代码清单2-12
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

AbstractCoroutineContextElement内部通过重写Element接口的key字段来指定具体的Element类型。

下面看一个自定义上下文的demo:

// 代码清单2-13
class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) {
    companion object Key : CoroutineContext.Key<CoroutineName>
}

var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("c_test")

suspend {
    println("coroutineContextTest: In Coroutine ${coroutineContext[CoroutineName]?.name} with start")
}.startCoroutine(object : Continuation<Int> {
    override val context: CoroutineContext
    get() = coroutineContext

    override fun resumeWith(result: Result<Int>) {
        println("resumeWith: Continuation with start End: $result")
    }
})

CoroutineName中通过伴生对象志明Key类型,并且通过父类构造函数将其传递上去,以指定CoroutineName所对应的Element类型。

接下去创建CoroutineName实例,并加到EmptyCoroutineContext中,最后再协程体内部通过coroutineContext以及对应的Key(即伴生对象CoroutineName),即可获的CoroutineName的实例。

拦截器

在android代码中,更新UI都要在主线程中执行,当发起网络请求或其他耗时操作都会切到子线程中执行,而suspend函数恢复执行依赖调用continuation.resumeWIth()所在的线程,这样就要手动切换线程,容易引发bug。

ContinuationInterceptor提供了拦截并重新包装Continuation实例的能力,通过重新包装Continuation实例,就可以实现自定义的需求,比如每次都自动切换回主线程执行。

ContinuationInterceptor接口定义如下:

// 代码清单2-14
interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    fun releaseInterceptedContinuation(continuation: Continuation<*>)
}

通过实现interceptContinuation方法重新包装原始的continuation,以实现自定义的需求。下面我们看一个在协程恢复时添加日志的拦截器示例:

// 代码清单2-15
class LogInterceptor() : ContinuationInterceptor {
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        LogContinuation(continuation)
}

class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
    override fun resumeWith(result: Result<T>) {
        println("before resumeWith: $result")
        continuation.resumeWith(result)
        println("after resumeWith")
    }
}

fun continuationInterceptorTest() {
    suspend {
        suspendFun1()
        val res2 = suspendFun2()
        println("suspendFun2 result $res2")
        res2 + 3
    }.startCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext
            get() = LogInterceptor()

        override fun resumeWith(result: Result<Int>) {
            println("continuationInterceptorTest resume $result")
        }

    })
}

suspend fun suspendFun1() {
    println("suspendFun1")
}

suspend fun suspendFun2() = suspendCoroutine<Int> {
    thread {
        sleep(50)
        it.resume(65)
    }
}

demo中首先定义了LogInterceptorLogContinuation两个类。LogContinuation接受Continuation作为构造函数参数,通过委托方式实现Continuation接口,在resumeWith中添加日志以记录协程的执行记录。

continuationInterceptorTest中有suspendFun1suspendFun2两个挂起函数,由于suspendFun1并没有真正挂起,所以在执行suspendFun1时没有LogContinuation的日志,suspendFun2则是在调用it.resume(65)时会打印相关的日志。

参考链接

作者:Longlongago
链接:https://juejin.cn/post/7139893784355012615

上一篇下一篇

猜你喜欢

热点阅读