协程的结构化并发:父子关系与取消操作
前言
在并发编程中,我们经常需要使用多个任务来同时处理不同的工作,以提高程序的效率和响应性。然而,并发编程也带来了很多复杂和困难的问题,比如任务之间的同步和协调,资源和内存的管理,错误和异常的处理等。为了解决这些问题,协程提供了一种结构化并发的概念,让我们可以更容易地编写正确和高效的并发代码。
结构化并发是指协程之间具有父子关系,可以通过父协程控制子协程的状态和行为。这样,我们就不需要手动地管理协程的创建和销毁,也不需要担心协程之间的依赖和冲突。结构化并发可以让我们把注意力集中在业务逻辑上,而不是细节上。
一:父子协程
那么,协程之间是如何建立父子关系的呢?答案是通过Job对象。Job对象是一个表示协程执行状态的对象,它有以下几个属性:
- isActive:表示协程是否处于活动状态
- isCompleted:表示协程是否已经完成
- isCancelled:表示协程是否已经取消
- children:表示协程的所有子协程
当我们创建一个协程时,我们可以指定它的父协程,或者使用当前作用域中的默认父协程。这样,就会在Job对象之间建立父子关系。例如:
// 创建一个顶层作用域
val scope = CoroutineScope(Dispatchers.Default)
// 在作用域中创建一个父协程
val parent = scope.launch {
// 在父协程中创建一个子协程
val child = launch {
// 在子协程中执行一些工作
println("Child is working...")
delay(1000)
println("Child is done.")
}
// 等待子协程完成
child.join()
println("Parent is done.")
}
在这个例子中,我们在scope作用域中创建了一个父协程parent,并在parent中创建了一个子协程child。这样,parent就会成为child的父协程,并且child会被添加到parent的children属性中。
与线程相比,协程之间的父子关系有很大的不同。线程之间没有父子关系,线程的结束不受其他线程的影响。例如:
// 创建一个线程
Thread parent = new Thread(() -> {
// 在线程中创建另一个线程
Thread child = new Thread(() -> {
// 在子线程中执行一些工作
System.out.println("Child is working...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Child is done.");
});
// 启动子线程
child.start();
// 等待子线程完成
try {
child.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Parent is done.");
});
// 启动父线程
parent.start();
在这个例子中,我们创建了一个线程parent,并在parent中创建了另一个线程child。虽然我们可以把parent看作是child的父线程,但实际上它们之间没有任何关系。如果parent被中断或者异常终止,child并不会受到任何影响,它会继续执行直到结束。
多个http请求的请求顺序管理
使用父子协程可以让我们更容易地编写复杂的并发逻辑,比如有依赖关系的多个http请求。例如,假设我们需要发起三个http请求,其中一个请求的参数是另外两个请求的结果。我们可以使用父子协程来实现这个需求,如下:
// 创建一个顶层作用域
val scope = CoroutineScope(Dispatchers.IO)
// 在作用域中创建一个父协程
val parent = scope.launch {
// 在父协程中创建两个子协程,分别发起第一个和第二个http请求,并返回结果
val result1 = async { request1() }
val result2 = async { request2() }
// 在父协程中等待两个子协程都完成,并获取结果
val param1 = result1.await()
val param2 = result2.await()
// 在父协程中发起第三个http请求,使用前两个请求的结果作为参数,并获取结果
val result3 = request3(param1, param2)
// 在父协程中处理最终的结果
handleResult(result3)
}
在这个例子中,我们在scope作用域中创建了一个父协程parent,并在parent中创建了两个子协程result1和result2,分别发起第一个和第二个http请求,并返回结果。然后,我们在parent中等待两个子协程都完成,并获取它们的结果。接着,我们在parent中发起第三个http请求,使用前两个请求的结果作为参数,并获取结果。最后,我们在parent中处理最终的结果。
使用父子协程的优点有以下几点:
- 我们可以利用async函数和await方法来实现结构化完成操作,保证协程的执行顺序和结果的一致性。
- 我们可以利用cancel方法或cancelChildren方法来实现结构化取消操作,释放资源和内存,避免内存泄漏和错误。
- 我们可以利用Job对象来控制协程的状态和行为,比如取消或等待某个协程或所有子协程。
- 我们可以利用invokeOnCompletion或invokeOnCancellation方法来注册回调函数,在协程完成或取消时做一些处理,比如传递异常或取消原因给其他协程。
- 我们可以利用SupervisorJob来改变默认的异常传播机制,让一个子协程的失败不会影响其他子协程。
二:父子协程完成操作
有了父子关系,我们就可以利用它来实现一些结构化的并发操作。其中一个操作就是结构化完成操作,也就是说,父协程只有在所有子协程都完成后才算是完成状态。这种操作可以保证协程的执行顺序和结果的一致性。
为了实现结构化完成操作,我们可以使用join方法或者await方法来等待协程的完成。join方法是Job对象的方法,它会挂起当前协程,直到目标协程完成。await方法是Deferred对象的方法,它会挂起当前协程,直到目标协程返回一个结果。Deferred对象是一种特殊的Job对象,它可以表示一个有返回值的协程。
例如:
// 创建一个顶层作用域
val scope = CoroutineScope(Dispatchers.Default)
// 在作用域中创建一个父协程
val parent = scope.launch {
// 在父协程中创建两个子协程
val child1 = async {
// 在子协程中执行一些工作,并返回一个结果
println("Child1 is working...")
delay(1000)
println("Child1 is done.")
1
}
val child2 = launch {
// 在子协程中执行一些工作
println("Child2 is working...")
delay(2000)
println("Child2 is done.")
}
// 等待两个子协程都完成,并获取结果
val result = child1.await()
child2.join()
println("Parent is done. Result is $result.")
}
在这个例子中,我们在scope作用域中创建了一个父协程parent,并在parent中创建了两个子协程child1和child2。child1是一个有返回值的协程,所以我们使用async函数来创建它,并使用await方法来获取它的结果。child2是一个没有返回值的协程,所以我们使用launch函数来创建它,并使用join方法来等待它的完成。这样,parent就会在child1和child2都完成后才结束,并打印出结果。
三:父子协程取消操作
除了结构化完成操作,我们还可以利用父子关系来实现另一个结构化的并发操作,那就是结构化取消操作,也就是说,当父协程取消时,所有子协程也会取消。这种操作可以让我们释放资源和内存,避免内存泄漏和错误。
为了实现结构化取消操作,我们可以使用cancel方法或者cancelAndJoin方法来取消协程。cancel方法是Job对象的方法,它会取消目标协程,并递归地取消所有子协程。cancelAndJoin方法是Job对象的扩展方法,它会先调用cancel方法,然后调用join方法,等待目标协程和所有子协程都完成。
例如:
// 创建一个顶层作用域
val scope = CoroutineScope(Dispatchers.Default)
// 在作用域中创建一个父协程
val parent = scope.launch {
// 在父协程中创建两个子协程
val child1 = launch {
// 在子协程中执行一些工作,并尝试捕获异常
try {
println("Child1 is working...")
delay(1000)
println("Child1 is done.")
} catch (e: CancellationException) {
println("Child1 is cancelled.")
}
}
val child2 = launch {
// 在子协程中执行一些工作,并尝试捕获异常
try {
println("Child2 is working...")
delay(2000)
println("Child2 is done.")
} catch (e: CancellationException) {
println("Child2 is cancelled.")
}
}
// 取消父协程
delay(500)
println("Parent is cancelling...")
cancelAndJoin()
println("Parent is cancelled.")
}
在这个例子中,我们在scope作用域中创建了一个父协程parent,并在parent中创建了两个子协程child1和child2。然后,我们在parent中延迟了500毫秒,然后调用cancelAndJoin方法来取消parent和所有子协程。这样,child1和child2都会抛出CancellationException异常,并打印出取消的信息。
结构化取消操作在Android开发中非常有用,可以通过ViewModelScope来管理协程的生命周期,避免内存泄漏。ViewModelScope是一种特殊的作用域,它会在ViewModel被销毁时自动取消所有协程。例如:
class MyViewModel : ViewModel() {
// 在ViewModelScope中创建一个协程
fun doSomeWork() = viewModelScope.launch {
// 在协程中执行一些工作
println("Work is started...")
delay(1000)
println("Work is done.")
}
}
在这个例子中,我们在MyViewModel类中定义了一个doSomeWork方法,它会在viewModelScope中创建一个协程,并在协程中执行一些工作。当MyViewModel被销毁时,viewModelScope会取消所有协程,释放资源和内存。
四:Job源码分析
Job对象的状态变化为了更深入地理解Job对象是如何实现协程之间的父子关系和结构化并发操作的,我们可以看一下Job的源码。Job是一个接口,它继承自CoroutineContext.Element,也就是说,它是协程上下文的一个元素。Job接口定义了一些属性和方法,如下:
public interface Job : CoroutineContext.Element {
// 协程上下文的Key
public companion object Key : CoroutineContext.Key<Job>
// 协程是否处于活动状态
public val isActive: Boolean
// 协程是否已经完成
public val isCompleted: Boolean
// 协程是否已经取消
public val isCancelled: Boolean
// 协程的所有子协程
public val children: Sequence<Job>
// 取消协程,并传递一个可选的原因
public fun cancel(cause: CancellationException? = null): Boolean
// 等待协程完成,并传递一个可选的原因
public suspend fun join()
// 注册一个回调函数,在协程完成时调用,并返回一个DisposableHandle对象,可以用来取消注册
public fun invokeOnCompletion(
onCancelling: Boolean = false,
invokeImmediately: Boolean = true,
handler: CompletionHandler): DisposableHandle
// 注册一个回调函数,在协程取消时调用,并返回一个DisposableHandle对象,可以用来取消注册
public fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle
// 附加一个子协程到当前协程,并返回一个ChildHandle对象,可以用来取消附加
public fun attachChild(child: Job): ChildHandle
// 获取协程的取消原因,如果没有则返回null
public fun getCancellationException(): CancellationException
}
从这些属性和方法中,我们可以看出,Job对象提供了一些基本的功能,比如:
- 判断协程的状态(isActive, isCompleted, isCancelled)
- 取消或等待协程(cancel, join)
- 注册或取消回调函数(invokeOnCompletion, invokeOnCancellation)
- 附加或取消子协程(attachChild)
这些功能都是为了实现结构化并发操作所必需的。但是,Job接口只是定义了一些抽象的方法,并没有具体的实现。那么,具体的实现是在哪里呢?
答案是在JobImpl类中。JobImpl类是Job接口的一个实现类,它继承自AbstractCoroutine类,并实现了SelectClause0接口。AbstractCoroutine类是一个抽象类,它封装了一些协程的基本逻辑和状态机制。SelectClause0接口是一个选择器子句接口,它允许我们在select表达式中使用join方法。JobImpl类的代码如下:
internal open class JobImpl(parent: Job? = null) : AbstractCoroutine<Unit>(parent, true), Job, SelectClause0 {
override val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
override fun handlesException(): Boolean = true
override suspend fun join() {
if (!joinInternal()) {
coroutineContext.cancelChildren()
throw JobCancellationException("Job was cancelled", null, this)
}
}
override fun getCompletionException(): CancellationException =
state.let { state ->
when (state) {
is Finishing -> state.rootCause?.toCancellationException("Job is finishing normally", this)
is CompletedExceptionally -> state.cause.toCancellationException("Job was cancelled normally", this)
else -> error("Job is still active or completing: $state")
}
}
override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler): DisposableHandle {
var nodeCache: JobNode<*>? = null
while (true) {
val state = this.state
when (state) {
is Empty -> {
if (state.isActive) {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state)
}
is Incomplete -> {
val list = state.list
if (list == null) {
promoteSingleToNodeList(state as JobNode<*>)
} else {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
else -> {
if (invokeImmediately) invokeHandler(handler, state, this)
return NonDisposableHandle
}
}
}
}
override fun invokeOnCancellation(handler: CompletionHandler): DisposableHandle {
var nodeCache: JobCancellationNode<*>? = null
while (true) {
val state = this.state
when (state) {
is Empty -> {
if (state.isActive) {
val node = nodeCache ?: JobCancellationNode(this, handler).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state)
}
is Incomplete -> {
val list = state.list
if (list == null) {
promoteSingleToNodeList(state as JobNode<*>)
} else {
val node = nodeCache ?: JobCancellationNode(this, handler).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
is CompletedExceptionally -> {
handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
else -> return NonDisposableHandle // must be active or cancelled normally
}
}
}
override fun attachChild(child: Job): ChildHandle {
var handleCache: ChildHandleNode? = null
while (true) {
val state = this.state
when (state) {
is Empty -> {
if (state.isActive) {
val handle =
handleCache ?: ChildHandleNode(this, child).also { handleCache = it }
if (_state.compareAndSet(state, handle)) return handle
} else
promoteEmptyToNodeList(state)
}
is Incomplete -> {
val list = state.list
if (list == null) {
promoteSingleToNodeList(state as JobNode<*>)
} else {
val handle =
handleCache ?: ChildHandleNode(this, child).also { handleCache = it }
if (addLastAtomic(state, list, handle)) return handle
}
}
else -> return NonDisposableHandle // must be complete and cannot add child to it anymore
}
}
}
override fun cancel(cause: CancellationException?) {
cancelImpl(cause)
}
override fun cancelChildren(cause: CancellationException?) {
this.state.let { state ->
when {
state !is Incomplete -> return // fast path when there is no list of children
cause != null && cause !is CancellationException -> cancelChildrenInternal(state.list, cause)
else -> cancelChildrenInternal(state.list)
}
}
}
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (!suppressed && mode != MODE_IGNORE && state is CompletedExceptionally && handlesException()) {
cancelParent(cause = state.cause)
}
}
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
cancelChildrenInternal(exceptionally?.cause)
}
override fun toString(): String =
"${nameString()}{${stateString(state)}}@$hexAddress"
protected open fun nameString(): String =
"Job"
}
从这些代码中,我们可以看出,JobImpl类主要做了以下几件事:
-
定义了一个_state字段,用来存储协程的状态,它是一个原子引用,可以保证线程安全。协程的状态可以是以下几种类型:
- Empty:表示协程刚刚创建,还没有任何子协程或回调函数,它有一个isActive属性,表示协程是否处于活动状态。
- JobNode:表示协程有一个子协程或回调函数,它继承自Job接口
- NodeList:表示协程有多个子协程或回调函数,它是一个双向链表,可以存储多个JobNode对象
- Incomplete:表示协程还没有完成,它是一个接口,有一个list属性,表示协程的子协程或回调函数列表,可以是null、JobNode或NodeList
- Finishing:表示协程正在完成,它是Incomplete接口的一个实现类,有一个rootCause属性,表示协程的取消原因
- CompletedExceptionally:表示协程已经异常完成,它有一个cause属性,表示协程的异常原因
- Any:表示协程已经正常完成,它可以是任何类型的值,表示协程的返回值
-
实现了join方法,用来等待协程的完成。join方法会调用joinInternal方法,该方法会检查协程的状态,如果协程已经完成,则直接返回true;如果协程还没有完成,则会注册一个回调函数,在协程完成时唤醒当前协程,并挂起当前协程;如果协程被取消,则会抛出一个JobCancellationException异常,并取消所有子协程。
-
实现了invokeOnCompletion和invokeOnCancellation方法,用来注册回调函数,在协程完成或取消时调用。这两个方法都会根据协程的状态进行不同的处理。如果协程还没有任何子协程或回调函数,则会创建一个JobNode对象,并把它作为新的状态;如果协程已经有一个子协程或回调函数,则会创建一个NodeList对象,并把它作为新的状态,并把原来的JobNode对象添加到NodeList中;如果协程已经有多个子协程或回调函数,则会创建一个新的JobNode对象,并把它添加到NodeList中;如果协程已经完成或取消,则会直接调用回调函数,并返回一个NonDisposableHandle对象。这两个方法都会返回一个DisposableHandle对象,可以用来取消注册。
-
实现了attachChild方法,用来附加一个子协程到当前协程,并返回一个ChildHandle对象,可以用来取消附加。这个方法也会根据协程的状态进行不同的处理。如果协程还没有任何子协程或回调函数,则会创建一个ChildHandleNode对象,并把它作为新的状态;如果协程已经有一个子协程或回调函数,则会创建一个NodeList对象,并把它作为新的状态,并把原来的JobNode对象和新创建的ChildHandleNode对象添加到NodeList中;如果协程已经有多个子协程或回调函数,则会创建一个新的ChildHandleNode对象,并把它添加到NodeList中;如果协程已经完成,则会返回一个NonDisposableHandle对象。
-
实现了cancel和cancelChildren方法,用来取消当前协程和所有子协程,并传递一个可选的原因。cancel方法会调用cancelImpl方法,该方法会检查当前状态是否可以被取消(即是否是Incomplete类型),如果可以,则会创建一个CompletedExceptionally对象,并把它作为新的状态,并通知所有子协程和回调函数;如果不可以,则什么也不做。cancelChildren方法会遍历当前状态中的所有子协程(即ChildHandleNode类型),并调用它们的dispose方法来取消它们。
-
实现了onCompletionInternal和onCancellationInternal方法,用来在当前状态发生变化时做一些处理。onCompletionInternal方法会在当前状态变为完成时被调用,它会检查当前状态是否是异常完成(即CompletedExceptionally类型),如果是,并且当前协程可以处理异常(即handlesException方法返回true),则会调用cancelParent方法,把异常原因传递给父协程。onCancellationInternal方法会在当前状态变为取消时被调用,它会调用cancelChildrenInternal方法,把取消原因传递给所有子协程。
从上面的分析中,我们可以看出,JobImpl类主要通过以下几种方式来实现协程之间的父子关系和结构化并发操作:
- 通过_state字段来存储和更新协程的状态,以及协程的子协程和回调函数列表
- 通过JobNode、NodeList、ChildHandleNode等类来封装不同类型的子协程和回调函数,并提供一些操作方法
- 通过invokeOnCompletion、invokeOnCancellation、attachChild等方法来注册或取消子协程和回调函数,并根据协程的状态进行不同的处理
- 通过join、cancel、cancelChildren等方法来等待或取消当前协程和所有子协程,并根据协程的状态进行不同的处理
- 通过onCompletionInternal、onCancellationInternal等方法来在协程的状态发生变化时做一些处理,比如传递异常或取消原因给父协程或子协程
这样,我们就可以利用Job对象来方便地控制协程的状态和行为,实现结构化并发操作。
总结
协程的结构化并发是通过Job对象为我们建立的父子关系,让我们可以方便地控制协程的状态和行为。我们可以使用join方法或者await方法来实现结构化完成操作,保证协程的执行顺序和结果的一致性。我们也可以使用cancel方法或者cancelAndJoin方法来实现结构化取消操作,释放资源和内存,避免内存泄漏和错误。协程的结构化并发可以让我们更容易地编写正确和高效的并发代码,避免错误和内存泄漏。