Android开发经验谈Android开发Android技术知识

Kotlin 协程入门

2019-03-27  本文已影响21人  ddu_

协程是什么

从使用的角度来说,协程可以简单理解为轻量化的线程,比线程更低一级的代码执行单元。

准备工作

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
//针对 android 项目
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"

为什么需要协程

首先看一下,客户端从服务器获取数据的一般流程:

理想的操作是这样的:

fun requestToken():Token {...}
fun createPost(token: Token, item: Item): Post {...}
fun processPost(post: Post)

fun postItem(item: Item) {
    val token = requestToken()
    val post = createPost(token, item)
    processPost(post)
}

现实是残酷的,对于 Android 平台而言,获取 token 和数据的操作是耗时的,会阻塞主线程,导致页面无响应。

刚开始我们使用新线程 + 回调 来解决这个问题:

//启动新线程发起数据请求,当数据获取到后调用回调函数,必要的情况下,获取到数据后可以切换到主线程
fun requestToken(cb: (Token) -> Unit) {...}
fun createPost(token: Token, item: Item, cb: (Post) -> Unit) {...}
fun processPost(post: Post) {...}

fun postItem(item: Item) {
    requestToken { token ->
        createPost(token, item) { post ->
            processPost(post)
        }
    }
}

问题来了,过多的回调使得代码变得冗长而难以阅读,这里使用了 lamada 表达式使得情况稍微好点,不用 lamada,再来个四五层的回调,会疯的。

很多人都受不了了,陆续出了三个方案来解决这个问题:

他们本质是一样的,通过数据的封装来减少回调:

 fun requestToken():Promise<Token> {...}
 fun createPost(token: Token, item: Item): Promise<Token> {...}
 fun processPost(post: Post)

 fun postItem(item: Item) {
     requestToken()
        .thenCompose { token -> createPost(token, item) }
        .thenAccept {post -> processPost(post)}
 }
fun requestToken(): Token { ... }
fun createPost(token: Token, item: Item): Post { ... }
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
    Single.fromCallable { requestToken() }
            .map { token -> createPost(token, item) }
            .subscribe(
                    { post -> processPost(post) }, // onSuccess
                    { e -> e.printStackTrace() } // onError
            )
}

问题大为改观,巴特, 为了使用这些库你需要记忆诸如 thenCompose, thenAccept 这样的操作符,不同的库,操作符可能名字还不相同。新功能的增加往往伴随操作符的增加,给使用者带来的极大的学习负担。我大概 16 年开始使用 Rxjava,Rxjava 的操作符到现在我还没彻底搞清楚。可能是我太懒了吧。

最后我们看下,协程在这个问题的解决上究竟有多香!

suspend fun requestToken():Token {...}
suspend fun createPost(token: Token, item: Item): Post {...}
suspend fun processPost(post: Post)

suspend fun postItem(item: Item) {
    val token = requestToken()
    val post = createPost(token, item)
    processPost(post)
}

无限接近,最开始提出的理想方案———顺序执行。

协程是如何做到的如此 nb 的 !?

如何启动协程

线程 -> 协程

我们的代码默认执行在线程环境下,runBlocking 像一个转换器,把线程环境转换为协程环境。

import kotlinx.coroutines.*

fun main() { 
    println("Hello,") 
    runBlocking { // 线程 -> 协程    
        delay(2000L) //main 线程阻塞两秒
    } 
}

runBlocking 还可以直接启动一个 main 协程:

import kotlinx.coroutines.*

fun main() = runBlocking {
    println("Hello") 
}

通过 CoroutineScope 接口的扩展方法 launch 可以启动一个协程。因为 CoroutineScope 是一个接口,不能直接调用其扩展方法,需要定义 CoroutineScope 接口的实现类才能使用。协程库提供了一个 CoroutineScope 的实现 GlobalScope。

import kotlinx.coroutines.*

fun main() {
    //启动协程,就像启动一个线程一样,在后台运行
    GlobalScope.launch { 
        delay(1000L) 
        println("World!") 
    }
    println("Hello,") 
    //主线程不 sleep,则不会打印 World,程序直接结束,不会等待协程执行完毕
    Thread.sleep(2000L) 
}

我们也可以自己写 CoroutineScope 的实现类。实际上,不需要这么麻烦,协程库给我们提供了 CoroutineScope 方法来快速实现一个 CoroutineScope 对象。

val uiScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
val computeScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
        
uiScope.launch {
    delay(1000)
}
        
ioScope.launch { 
    delay(1000)
}
        
computeScope.launch { 
    delay(1000)
}

这里,Dispatchers.Main,Dispatchers.IO,Dispatchers.Default 用于指定协程运行的线程环境:

协程 -> 协程

我们也可以在协程环境下启动一个新的协程

通过 launch,async 在协程中启动协程:

fun main() {
    runBlocking {
        val job:Job = launch {
            delay(1000)
            println("子协程1执行完毕")
        }

        val deferred:Deferred<Int> = async {
            delay(1200)
            println("子协程2执行完毕")
            3 //协程的返回值
        }

        delay(100)
        println("父线程执行完毕") 
        //父协程等待子协程结束
    }
}

launch 方法返回一个 Job 对象,async 返回一个 Deferred 对象,Deferred 是带返回值的,可以通过 deferred.await() 获取到返回值。

另外需要注意的是,协程内部启动的协程称为子协程,父协程会等待子协程执行完后才会结束,而不会直接结束。这里可以和之前的 GlobalScope 对比一下。

coroutineScope 也可以在协程环境下启动一个新的协程,通常用于完成并行任务。

import kotlinx.coroutines.*

fun main() = runBlocking { 

    coroutineScope { 
        launch {
            delay(500L) 
            println("Task from nested launch")
        }
    
        delay(100L)
        println("Task from coroutine scope") 
    }
    //coroutineScope 执行完了,才会执行后续代码
    println("Coroutine scope is over")
}

挂起 vs 阻塞 (suspend vs block)

在线程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一线程后续代码,我们称该操作 阻塞(block) 了线程。

在协程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一协程后续代码,我们称该操作 挂起(suspend) 了协程。

就是多了个叫法,方便区分。

kotlin 中有一个关键字 suspend,用于修饰方法。

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

用于启动协程的 coroutineScope 就是一个 suspend 方法。所以,coroutineScope 执行完了,才会执行后续代码。

原理

以上就是协程最基础的部分。了解一下协程的工作原理:

查看下源码:

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job


public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

可以看出,启动协程时,我们传入的 block 都是 CoroutineScope 的扩展函数:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

CoroutineScope 有一个成员变量 coroutineContext,在协程中我们都可以
访问到这个成员变量

fun main() {
    GlobalScope.launch {
        println(coroutineContext)
    }
    Thread.sleep(1000)
}

CoroutineContext 是一个接口,功能类似于 Map,用于保存 key-value 型数据。

public interface CoroutineContext {
    //'Map' 中的 key,E 是对于 Value 的类型
    public interface Key<E : Element>
    //'Map' 中的 value
    public interface Element : CoroutineContext {
        
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
   
    //根据 key 获取 value,重载操作符
    public operator fun <E : Element> get(key: Key<E>): E?
    //遍历 ‘map’,进行累积操作
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    //重载操作符 ‘+’ , 用于将两个 'map' 合并
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    //从 ‘map’ 中删除键值对
    public fun minusKey(key: Key<*>): CoroutineContext
}

CoroutineContext 表示协程工作的上下文。主要包含了下面几类对象:

通过重载操作符我们都可以访问到这些对象

fun main() {
    GlobalScope.launch {
        val job = coroutineContext[Job]
        println(job)
        val  continnuation = coroutineContext[ContinuationInterceptor]
        println(continnuation)
        val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
        println(exceptionHandler)
        val name = coroutineContext[CoroutineName]
        println(name)
    }
    Thread.sleep(1000)
}

小结一下:

每个用于启动协程的 block 都是 CoroutineScope 接口的扩展方法,都继承了 coroutineContext 成员变量。CoroutineScope 用于定义协程的作用域(scope),
coroutineContext 表示协程工作的上下文,类似一个map,保存协程作用域中的重要对象:

从源代码可以看出 CoroutineContext 相对于 Map 的几点优势:

我从协库里找了点代码(格式上稍作修改),来看看 CoroutineContext 到底有多 nice:

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //合并两个 CoroutineContext,后者权重更高,就是说,如果两个 context 内部有相同的 key,取加号右边的 value
    val combined = coroutineContext + context
    val debug = if (DEBUG) {
        //CoroutineContext 与 Element 合并,加号右侧权重更高
        combined + CoroutineId(COROUTINE_ID.incrementAndGet())
    }  else {
        combined
    }
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) {
        //CoroutineContext 与 Element 合并,加号右侧权重更高
        debug + Dispatchers.Default
    } else {
        debug
    } 
}

最后,再来看看协程的启动的一些细节

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

通过 launch 启动的协程,其上下文对象 CoroutineContext 由父协程的 CoroutineContext 对象和 launch 方法的第一个参数共同决定。具体规则如下图所示:

协程原理.png

新协程context = 父协程context + 参数 context + child Job

启动过程中,会创建一个新的 Job 对象 child Job,父协程的 job 对象是它的 parent。内部是通过 attachChild 方法来确定 job 之间的关系的。

线程切换

在启动一个线程时我们可以指定一个线程:

//在 main 线程之上运行协程
launch(Dispatchers.Main) {
       
}

也可通过 withContext 方法切换线程

launch(Dispatchers.Main) {
    withContext(Dispatchers.IO) {

    }
}

用于指定线程的参数包括:

fun main() {

    GlobalScope.launch {
        launch(Dispatchers.Unconfined) {
            println("1 I'm working in thread ${Thread.currentThread().name}")
            withContext(Dispatchers.IO) {
                println("2 I'm working in thread ${Thread.currentThread().name}")
            }
            println("3 I'm working in thread ${Thread.currentThread().name}")
        }
    }

    Thread.sleep(1000)
}

输出:

1 I'm working in thread DefaultDispatcher-worker-1
2 I'm working in thread DefaultDispatcher-worker-2
3 I'm working in thread DefaultDispatcher-worker-2

协程方法

join

协程的 join 方法和线程的 join 方法类似,都是让出当前执行权,让其它协程先执行。

import kotlinx.coroutines.*

fun main() = runBlocking {
    //返回一个 job 对象
    val job = GlobalScope.launch { 
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join()  //外部也成让出执行权,执行 job 协程
    //job 协程执行完后,才会执行后续代码
    println("Hello,")
}

yiedld

用于让出当前线程,让其它协程执行。如果协程是通过 Dispatchers.Unconfined 启动的,yiedld 方法什么都不做。

fun main() {
    GlobalScope.launch {
        yield()
    }

    Thread.sleep(1000)
}

协程的退出

可以通过协程的 cancel 方法来退出一个可退出的(cancellable)协程。

那什么是可退出(cancellable)的协程呢?有两种情况:

情况一:协程中耗时任务都来自协程库中的 suspend 方法。

val job = launch {
    repeat(1000) { i ->
            println("I'm sleeping $i ...")
            //程库中的 suspend 方法
            delay(500L)
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion 
println("main: Now I can quit.")

协程库中的 suspend 方法在中断时都会抛出 CancellationException 异常,这个异常主要用于调试,通常无需单独处理。

情况二:使用 isActive 决定协程是否继续执行

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

协程调用 cancle 后,isActive 会被置为 false。

有的时候,我们希望在协程被中断后做一些清理工作,可以使用 try finally

val job = launch {
    try {
        repeat(1000) { i ->
                println("I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

finally 中是不能调用 suspend 方法,虽然在 finally 中调用 suspend 情况很少,但是还是可以通过 withContext(NonCancellable) {...} 来实现

val job = launch {
    try {
        repeat(1000) { i ->
                println("I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("I'm running finally")
            delay(1000L)
            println("And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

异常处理

通过 launch 启动的协程使用 CoroutineExceptionHandler 处理异常

val handler = CoroutineExceptionHandler { _, exception -> 
        println("Caught $exception") 
}
val job = GlobalScope.launch(handler) {
    throw AssertionError()
}

job.join()

通过 async 启动的协程使用 try catch 处理异常

val deferred = GlobalScope.async {
    println("Throwing exception from async")
    throw ArithmeticException() // Nothing is printed, relying on user to call await
}

try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }

当协程抛出异常时,协程的执行会结束,同时协程会将这个异常传递给父协程,父协程的其他子协程及父协程的执行也会结束。

val handler = CoroutineExceptionHandler { _, exception -> 
        println("Caught $exception") 
}
val job = GlobalScope.launch(handler) {
    launch { // the first child
        try {
            delay(Long.MAX_VALUE)
        } finally {
            withContext(NonCancellable) {
                println("Children are cancelled, but exception is not handled until all children terminate")
                delay(100)
                println("The first child finished its non cancellable block")
            }
        }
    }
    launch { // the second child
        delay(10)
        println("Second child throws an exception")
        throw ArithmeticException()
    }
}
job.join()

但是,有一个特殊的异常 CancellationException,它只会导致抛出该异常的协程结束,不会传递给其他协程。

有的时候,我们希望抛出异常时,可以单独结束一个协程。这时候可以使用 Supervision job

import kotlinx.coroutines.*

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("First child is failing")
            throw AssertionError("First child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                println("Second child is cancelled because supervisor is cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        println("Cancelling supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

android 实用代码

这里是协程库中注释给出的代码:

 class MyActivity : AppCompatActivity(), CoroutineScope {
      lateinit var job: Job
      override val coroutineContext: CoroutineContext
          get() = Dispatchers.Main + job
 
      override fun onCreate(savedInstanceState: Bundle?) {
          super.onCreate(savedInstanceState)
          job = Job()
      }
 
      override fun onDestroy() {
          super.onDestroy()
          job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
      }
 
      /*
       * Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
       * in this method throws an exception, then all nested coroutines are cancelled.
       */
      fun loadDataFromUI() = launch { // <- extension on current activity, launched in the main thread
         val ioData = async(Dispatchers.IO) { // <- extension on launch scope, launched in IO dispatcher
             // blocking I/O operation
         }
         // do something else concurrently with I/O
         val data = ioData.await() // wait for result of I/O
         draw(data) // can draw in the main thread
      }
  }

我觉得可以稍加改进下:

class MainScope : CoroutineScope, LifecycleObserver {

    private val job = SupervisorJob()
    override val coroutineContext: CoroutineContext
        get() = job + Dispatchers.Main

    @OnLifecycleEvent(Lifecycle.Event.ON_PAUSE)
    fun destroy() = coroutineContext.cancelChildren()
}
// usage
class MainFragment : Fragment() {
    private val uiScope = MainScope()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        lifecycle.addObserver(mainScope)
    }

    private fun loadData() = uiScope.launch {
        val result = withContext(bgDispatcher) {
            // your blocking call
        }
    }
}

参考资料

上一篇 下一篇

猜你喜欢

热点阅读