Kotlin协程系列之基础设施
前一篇文章介绍了kotlin协程的历史及现状,接下来就介绍一下kotlin协程吧。
kotlin协程分为标准库提供的基础设施以及官方协程库两部分,标准库的基础设施依照协程的思想提供基本的挂起、恢复接口,官方协程库基于这些接口提供丰富实用的功能。
今天先介绍一下kotlin协程的基础设施。为了让大家对kotlin协程有更为直观的认识,先展示一个demo:
// 代码清单2-1
val continuation = suspend {
println("simpleTest: In Coroutine")
5
}.createCoroutine(object : Continuation<Int> {
override fun resumeWith(result: Result<Int>) {
println("resumeWith: Continuation End: $result")
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
})
continuation.resume(Unit)
demo由三个部分组成:
- 由suspend声明的挂起函数
- 调用
createCoroutine
方法创建Continuation实例 - 通过
continuation.resume
启动协程
suspend函数作为协程体,输出一条日志,并且返回5作为执行结果。协程体执行完成后,回调Continuation的resumeWith方法,其内输出协程的执行结果。
suspend函数
suspend函数能在不阻塞线程执行的情况下将协程挂起,适合用于将异步代码转为同步形式。在suspend函数中可以调用普通函数和suspend函数,然而在普通函数中却不能调用suspend函数(由于编译时存在CPS转换,后续讲协程实现时详细说明)。
当suspend函数被真正挂起的时候,对应的调用处被乘坐挂起点。
suspend函数内部通过调用suspendCoroutine
方法实现挂起操作的,如果没有调用suspendCoroutine
,那么suspend函数并不会真正的挂起,前面的demo在实际运行时就不会挂起。
suspendCoroutine
的函数签名如下:
// 代码清单2-2
suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
它是个内联函数,其中block是参数为Continuation的回调函数,在异步任务完成时,早block内部调用continuation.resumeWith()
或者调用continuation.resume()
和continuation.resumeWithException()
恢复被挂起的协程。
下面看一个suspend函数的例子:
// 代码清单2-3
suspend fun suspendFun1() {
println("suspendFun1")
}
suspend fun suspendFun2() = suspendCoroutine<Int> {
thread {
sleep(50)
it.resume(65)
}
}
fun sleep(time:Long) {
try {
Thread.sleep(time)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
suspend {
suspendFun1()
val res2 = suspendFun2()
println("suspendFun2 result $res2")
res2 + 3
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("continuationInterceptorTest resume $result")
}
})
suspendFun1不会真正执行挂起逻辑,而suspendFun2通过suspendCoroutine
创建一个真正的挂起函数,并在内部开启子线程模拟耗时任务执行。所以在执行到suspendFun2时,当前协程先被挂起,等待模拟耗时操作完成后,通过continuation的resume方法恢复被挂起的协程。
构建协程
在代码清单2-1中,我们使用createCoroutine
和continuation.resume
创建并执行协程,当然我们还可以使用startCoroutine
直接创建并启动协程:
// 代码清单2-4
suspend {
println("simpleTest: In Coroutine with start")
6
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("resumeWith: Continuation with start End: $result")
println()
}
})
可以看出startCoroutine
用法和createCoroutine
类似,下面就看看它们的函数签名吧:
// 代码清单2-5
fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>
fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
)
startCoroutine
和createCoroutine
都是作为 (suspend () -> T)
函数类型的扩展函数定义的,因此在代码清单2-1和2-4中可以直接在suspend函数后直接调用,并且它们都接收一个Continuation类型实例作为回调,当协程执行完成后,会调用completion的resumeWith
函数,并将协程执行结果通过result参数传递。
标准库还定义了带Receiver的startCoroutine
和createCoroutine
,用于将协程体的作用域设置成Receiver:
// 代码清单2-6
fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
)
fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
)
Continuation
用startCoroutine
或createCoroutine
创建协程时都用到Continuation对象,它用来记录被挂起的协程在挂起点的状态,其内保存着挂起点之后要执行的代码。考虑如下序列生成器:
// 代码清单2-7
sequence {
for (i in 1..10) yield(i * i)
println("over")
}
该序列生成器在for循环内调用yield生成新的序列,并将当前协程挂起,所以在yield内部一定会有Continuation记录剩余要执行的代码,并且有十个Continuation对象:第一次执行i=2
和循环并挂起,第二次执行i=3
并挂起,以此逻辑依次执行。当协程被创建,但还没开始运行时,即调用了createCoroutine
后未调用resume,此时存在一个初始的Continuation<Unit>
表示所有的协程代码。
下面我们看看Continuation的定义:
// 代码清单2-8
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
context表示当前协程的上下文,用户可以根据需求自定义上下文,稍后会详细介绍。在前面的demo中,我们都是直接使用EmptyCoroutineContext
,这是一个自带的上下文,无特殊需求时可以用它。
resumeWith
函数是协程的完成回调,不论协程执行成功或失败,都通过此函数通知用户。为了方便使用,kotlin提供了两个扩展函数:
// 代码清单2-9
fun <T> Continuation<T>.resume(value: T)
fun <T> Continuation<T>.resumeWithException(exception: Throwable)
resume
作为成功通知,resumeWithException
作为失败通知。
上下文
协程上下文类似Set集合,用于保存于协程关联的自定义数据:可以包含协程的线程策略、日志信息、协程安全和事务相关信息、协程id及名字等等。可以将协程当做轻量级线程,那么协程上下文就类似线程的ThreadLocal变量,不过ThreadLocal是可变的,而协程上下文是不可变的。
协程上下文在kotlin中用CoroutineContext
表示,这是一个集合,下面是其在标准库中的定义:
// 代码清单2-10
interface CoroutineContext {
operator fun <E : Element> get(key: Key<E>): E?
fun <R> fold(initial: R, operation: (R, Element) -> R): R
operator fun plus(context: CoroutineContext): CoroutineContext
fun minusKey(key: Key<*>): CoroutineContext
interface Element : CoroutineContext {
val key: Key<*>
}
interface Key<E : Element>
}
从定义中可以看出,CoroutineContext
由Element组成,Element仅包含字段key,明显key是作为Element的索引。
Key接口通过泛型将Key与Element关联起来,可以看做Key即为Element本身,这与Set类似。然而CoroutineContext
重载了操作符[]
,即代码清单2-10中的get函数,它接收Key类型索引,而返回Element类型的元素,从这方面看,CoroutineContext
又喝Map相似。
fold
、plus
、minusKey
属于集合操作,这里就不详细介绍了,请自行查阅文档。
总之,CoroutineContext
是一种混合了Set和Map结构的新型集合。
EmptyCoroutineContext
EmptyCoroutineContext
是标准库提供的一个不包含任何数据的CoroutineContext
实例,在没有特殊需求时可以使用此实例,其定义如下:
// 代码清单2-11
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = EmptyCoroutineContext
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"
}
实现
在实现自定义上下文时,不能直接实现CoroutineContext
接口,标准库提供了AbstractCoroutineContextElement
,应该实现此类,其定义如下:
// 代码清单2-12
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
AbstractCoroutineContextElement
内部通过重写Element接口的key字段来指定具体的Element类型。
下面看一个自定义上下文的demo:
// 代码清单2-13
class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineName>
}
var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("c_test")
suspend {
println("coroutineContextTest: In Coroutine ${coroutineContext[CoroutineName]?.name} with start")
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = coroutineContext
override fun resumeWith(result: Result<Int>) {
println("resumeWith: Continuation with start End: $result")
}
})
CoroutineName
中通过伴生对象志明Key类型,并且通过父类构造函数将其传递上去,以指定CoroutineName
所对应的Element类型。
接下去创建CoroutineName
实例,并加到EmptyCoroutineContext
中,最后再协程体内部通过coroutineContext
以及对应的Key(即伴生对象CoroutineName
),即可获的CoroutineName
的实例。
拦截器
在android代码中,更新UI都要在主线程中执行,当发起网络请求或其他耗时操作都会切到子线程中执行,而suspend函数恢复执行依赖调用continuation.resumeWIth()
所在的线程,这样就要手动切换线程,容易引发bug。
ContinuationInterceptor
提供了拦截并重新包装Continuation
实例的能力,通过重新包装Continuation
实例,就可以实现自定义的需求,比如每次都自动切换回主线程执行。
ContinuationInterceptor
接口定义如下:
// 代码清单2-14
interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
fun releaseInterceptedContinuation(continuation: Continuation<*>)
}
通过实现interceptContinuation
方法重新包装原始的continuation,以实现自定义的需求。下面我们看一个在协程恢复时添加日志的拦截器示例:
// 代码清单2-15
class LogInterceptor() : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith: $result")
continuation.resumeWith(result)
println("after resumeWith")
}
}
fun continuationInterceptorTest() {
suspend {
suspendFun1()
val res2 = suspendFun2()
println("suspendFun2 result $res2")
res2 + 3
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = LogInterceptor()
override fun resumeWith(result: Result<Int>) {
println("continuationInterceptorTest resume $result")
}
})
}
suspend fun suspendFun1() {
println("suspendFun1")
}
suspend fun suspendFun2() = suspendCoroutine<Int> {
thread {
sleep(50)
it.resume(65)
}
}
demo中首先定义了LogInterceptor
和LogContinuation
两个类。LogContinuation
接受Continuation
作为构造函数参数,通过委托方式实现Continuation
接口,在resumeWith
中添加日志以记录协程的执行记录。
在continuationInterceptorTest
中有suspendFun1
和suspendFun2
两个挂起函数,由于suspendFun1
并没有真正挂起,所以在执行suspendFun1
时没有LogContinuation
的日志,suspendFun2
则是在调用it.resume(65)
时会打印相关的日志。
参考链接
作者:Longlongago
链接:https://juejin.cn/post/7139893784355012615