Kotlin协程实现原理:CoroutineScope&Job
今天我们来聊聊Kotlin
的协程Coroutine
。
如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?
如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。
Kotlin协程实现原理:Suspend&CoroutineContext
如果你已经接触过协程,相信你都有过以下几个疑问:
- 协程到底是个什么东西?
- 协程的
suspend
有什么作用,工作原理是怎样的? - 协程中的一些关键名称(例如:
Job
、Coroutine
、Dispatcher
、CoroutineContext
与CoroutineScope
)它们之间到底是怎么样的关系? - 协程的所谓非阻塞式挂起与恢复又是什么?
- 协程的内部实现原理是怎么样的?
- ...
接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。
CoroutineScope
CoroutineScope
是什么?如果你觉得陌生,那么GlobalScope
、lifecycleScope
与viewModelScope
相信就很熟悉了吧(当然这个是针对于Android
开发者)。它们都实现了CoroutineScope
接口。
public interface CoroutineScope {
/**
* The context of this scope.
* Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
* Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
*
* By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
*/
public val coroutineContext: CoroutineContext
}
CoroutineScope
中只包含一个待实现的变量CoroutineContext
,至于CoroutineContext
之前的文章已经分析了它的内部结构,这里就不再累赘了。
通过它的结构,我们可以认为它是提供CoroutineContext
的容器,保证CoroutineContext
能在整个协程运行中传递下去,约束CoroutineContext
的作用边界。
例如,在Android
中使用协程来请求数据,当接口还没有请求完成时Activity
就已经退出了,这时如果不停止正在运行的协程将会造成不可预期的后果。所以在Activity
中我们都推荐使用lifecycleScope
来启动协程,lifecycleScope
可以让协程具有与Activity
一样的生命周期意识。
下面是lifecycleScope
源码:
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
它创建了一个LifecycleCoroutineScopeImpl
实例,它实现了CoroutineScope
接口,同时传入SupervisorJob() + Dispatchers.Main
作为它的CoroutineContext
。
我们再来看它的register()
方法
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
init {
// in case we are initialized on a non-main thread, make a best effort check before
// we return the scope. This is not sync but if developer is launching on a non-main
// dispatcher, they cannot be 100% sure anyways.
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
coroutineContext.cancel()
}
}
fun register() {
// TODO use Main.Immediate once it is graduated out of experimental.
launch(Dispatchers.Main) {
if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
} else {
coroutineContext.cancel()
}
}
}
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
在register
方法中通过经典的launch
来创建一个协程,而launch
使用到的CoroutineContext
就是CoroutineSope
中的CoroutineContext
。然后在协程中结合Jetpack
的Lifecycle
特性来监听Activiyt
的生命周期。
如果对
Lifecycle
的使用与特性还不是很了解的,推荐阅读这篇入门级文章Android Architecture Components Part3:Lifecycle
意思就是说在Activity
销毁的时候会调用下面的方法取消协程的运行。
coroutineContext.cancel()
这里就使用到了CoroutineContext
,经过上篇文章的分析我们很容易知道CoroutineContext
自身是没有cancel
方法的,所以这个cancel
方法是CoroutineContext
的扩展方法。
public fun CoroutineContext.cancel(): Unit {
this[Job]?.cancel()
}
所以真正的逻辑是从CoroutineContex
集合中取出Key
为Job
的实例,这个对应的就是上面创建LifecycleCoroutineScopeImpl
实例时传入的SupervisorJob
,它是CoroutineContext
的其中一个子类。
这时再来看lifecycleScope
相关的一些方法
lifecycleScope.launchWhenCreated { }
lifecycleScope.launchWhenStarted { }
lifecycleScope.launchWhenResumed { }
这些方法的内部逻辑就很明显了,也就是通过Lifecycle
来追踪Activity
的生命周期,从而约束协程运行的时机。
我们也可以不使用lifecycleScope
,自己实现一个CoroutineScope
,让它在Activity
达到同样的效果。
class MyActivity : AppCompatActivity(), CoroutineScope {
lateinit var job: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
}
/*
* Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
* in this method throws an exception, then all nested coroutines are cancelled.
*/
fun loadDataFromUI() = launch { // <- extension on current activity, launched in the main thread
val ioData = async(Dispatchers.IO) { // <- extension on launch scope, launched in IO dispatcher
// blocking I/O operation
}
// do something else concurrently with I/O
val data = ioData.await() // wait for result of I/O
draw(data) // can draw in the main thread
}
}
上面的实现也能够保证当前Activiyt
中的协程在Activity
销毁的时候终止协程的运行。
到这里CoroutineScope
的作用就呼之欲出了,它就是用来约束协程的边界,能够很好的提供对应的协程取消功能,保证协程的运行范围。
当然这又引申出另外一个话题
Job
是什么?
Job
基本上每启动一个协程就会产生对应的Job
,例如
lifecycleScope.launch {
}
launch
返回的就是一个Job
,它可以用来管理协程,一个Job
中可以关联多个子Job
,同时它也提供了通过外部传入parent
的实现
public fun Job(parent: Job? = null): Job = JobImpl(parent)
这个很好理解,当传入parent
时,此时的Job
将会作为parent
的子Job
。
既然Job
是来管理协程的,那么它提供了六种状态来表示协程的运行状态。
-
New
: 创建 -
Active
: 运行 -
Completing
: 已经完成等待自身的子协程 -
Completed
: 完成 -
Cancelling
: 正在进行取消或者失败 -
Cancelled
: 取消或失败
这六种状态Job
对外暴露了三种状态,它们随时可以通过Job
来获取
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
所以如果你需要自己来手动管理协程,可以通过下面的方式来判断当前协程是否在运行。
while (job.isActive) {
// 协程运行中
}
一般来说,协程创建的时候就处在Active
状态,但也有特例。
例如我们通过launch
启动协程的时候可以传递一个start
参数
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
如果这个start
传递的是CoroutineStart.LAZY
,那么它将处于New
状态。可以通过调用start
或者join
来唤起协程进入Active
状态。
下面我们来看一张简图,就能很清晰的了解Job
中的六个状态间的转化过程。
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
上面已经提及到一个Job
可以有多个子Job
,所以一个Job
的完成都必须等待它内部所有的子Job
完成;对应的cancel
也是一样的。
默认情况下,如果内部的子Job
发生异常,那么它对应的parent Job
与它相关连的其它子Job
都将取消运行。俗称连锁反应。
我们也可以改变这种默认机制,Kotlin
提供了SupervisorJob
来改变这种机制。这种情况还是很常见的,例如用协程请求两个接口,但并不想因为其中一个接口失败导致另外的接口也不请求,这时就可以使用SupervisorJob
来改变协程的这种默认机制。
使用很简单,在我们创建CoroutineContext
的时候加入SupervisorJob
即可。例如在上面提到过的lifecycleScope
,内部就使用到了SupervisorJob
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main
)
你也可以尝试运行下面的这个例子,然后将它的SupervisorJob
替换成别的CoroutineContext
再来看下效果。
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// 启动第二个子作业
val secondChild = launch {
firstChild.join()
// 取消了第一个子作业且没有传播给第二个子作业
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// 但是取消了监督的传播
println("The second child is cancelled because the supervisor was cancelled")
}
}
// 等待直到第一个子作业失败且执行完成
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
如果有些任务你并不想被手动取消,可以使用NonCancellable
作为任务的CoroutineContext
如果需要Job
获取协程的返回结果,可以通过Deferred
来实现,它是Job
的一个子类,所以也拥有Job
所用功能。同时额外提供await
方法来等待协程结果的返回。
Deferred
可以通过CoroutineScope.async
创建。
最后我们再来介绍下Job
的几个方法,start
与cancel
就不多说了,分别是启动与取消。
invokeOnCompletion
这个方法是Job
的回调通知,当Job
执行完后会调用这个方法
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
public typealias CompletionHandler = (cause: Throwable?) -> Unit
这个cause
有三种情况分别为:
-
is null
: 协程正常执行完毕 -
is CancellationException
: 协程正常取消,并非异常导致的取消 -
Otherwise
: 协程发生异常
同时它的返回值DisposableHandle
可以用来取消回调的监听。
join
public suspend fun join()
注意这是一个suspend
函数,所以它只能在suspend
或者coroutine
中进行调用。
它的作用是暂停当前运行的协程任务,立刻执行自身Job
的协程任务,直到自身执行完毕之后才恢复之前的协程任务继续执行。
本篇文章主要介绍了CoroutineScope
的作用与Job
的相关状态演化与运用。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。
项目
android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup
的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。
AwesomeGithub: 基于Github
客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin
语言进行开发,项目架构是基于Jetpack&DataBinding
的MVVM
;项目中使用了Arouter
、Retrofit
、Coroutine
、Glide
、Dagger
与Hilt
等流行开源技术。
flutter_github: 基于Flutter
的跨平台版本Github
客户端,与AwesomeGithub
相对应。
android-api-analysis: 结合详细的Demo
来全面解析Android
相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。
daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。