Androidandroid 时光机kotlin

kotlin - Coroutine 协程

2019-03-28  本文已影响3122人  前行的乌龟

我是在深入学习 kotlin 时第一次看到协程,作为传统线程模型的进化版,虽说协程这个概念几十年前就有了,但是协程只是在近年才开始兴起,应用的语言有:go 、goLand、kotlin、python , 都是在支持协程的,可能不同平台 API 上有差异

首次学习协程费些时间,携程和 thread 类似,但是和 thread u有很大区别,搞懂,学会,熟悉协程在线程上如何运作是要钻研一下的,上手可能不是那么快

这里有个很有建设性意见的例子,Coroutines 代替 rxjava


怎么理解协程

我们先来把协程这个概念搞懂,不是很好理解,但是也不难理解

协程 - 也叫微线程,是一种新的多任务并发的操作手段(当年也不是很新,概念早就有了)

下面是关于协程这个概念的一些描述:

协程的开发人员 Roman Elizarov 是这样描述协程的:协程就像非常轻量级的线程。线程是由系统调度的,线程切换或线程阻塞的开销都比较大。而协程依赖于线程,但是协程挂起时不需要阻塞线程,几乎是无代价的,协程是由开发者控制的。所以协程也像用户态的线程,非常轻量级,一个线程中可以创建任意个协程。

Coroutine,翻译成”协程“,初始碰到的人马上就会跟进程和线程两个概念联系起来。直接先说区别,Coroutine是编译器级的,Process和Thread是操作系统级的。Coroutine的实现,通常是对某个语言做相应的提议,然后通过后成编译器标准,然后编译器厂商来实现该机制。Process和Thread看起来也在语言层次,但是内生原理却是操作系统先有这个东西,然后通过一定的API暴露给用户使用,两者在这里有不同。Process和Thread是os通过调度算法,保存当前的上下文,然后从上次暂停的地方再次开始计算,重新开始的地方不可预期,每次CPU计算的指令数量和代码跑过的CPU时间是相关的,跑到os分配的cpu时间到达后就会被os强制挂起。Coroutine是编译器的魔术,通过插入相关的代码使得代码段能够实现分段式的执行,重新开始的地方是yield关键字指定的,一次一定会跑到一个yield对应的地方

对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。协程能保留上一次调用时的状态,不需要像线程一样用回调函数,所以性能上会有提升。缺点是本质是个单线程,不能利用到单个CPU的多个核

协程和线程的对比:

程序员能够控制协程的切换,通过yield让协程在空闲(比如等待io,网络数据未到达)时放弃执行权,通过resume调度协程运行。协程一旦开始运行就不会结束,直到遇到yield交出执行权。Yield和resume这一对控制可以比较方便地实现程序之间的“等待”需求,即“异步逻辑”。总结起来,就是协程可以比较方便地实现用同步的方式编写异步的逻辑。

比如 python 里的这段代码,在同一个线程里运行下面2个方法

def A():
    print '1'
    print '2'
    print '3'

def B():
    print 'x'
    print 'y'
    print 'z'

假设由协程执行,每个方法都用协程标记,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:1 2 x y 3 z

然后再说下现在的手机都是多核心的,采用多个大小核心封装处理,基本都可以多核心同时处理任务,所以协程非常符合时下多核心 CPU 的处理逻辑,采用 CPU 核心 1/2 - 1 倍之间的线程数,然后配个协程执行任务就能最好的兼顾多线程的性能了。rxjava captrue 专门用来计算的线程调度器多线程数就是在 cpu 核心数量的 1/2 - 1 之间,就是基于这种考虑,较少线程切换带来的损失,并发和串行之间的性能对比在加法计算中,只有百万次之上的时候,并发才用有更好的性能这就能看出线程切换带来的性能损耗有多大,这才是2个线程组成并发的情况

当然要是碰到老式 MTK cpu 这种假多核那就没办法了



添加依赖

在 module 项目中添加下面的依赖:

    kotlin{
        experimental {
            coroutines 'enable'
        }
    }

    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1'

kotlin Coroutine 部分在最近几个版本变化较大,推荐大家使用 kotlin 的最新版本 1.3.21,同时 kotlin 1.3.21 版本 kotlin-stdlib-jre7 支持库更新为 kotlin-stdlib-jdk7

buildscript {
    ext.kotlin_version = '1.3.21'
    ......
}
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"

kotlin 中协程涉及的概念和 API

1. 协程概念

文章开头已经介绍了协程的概念,但毕竟平台不同,也许多协程也有不一样的地方,我们还是看看 kotlin 中的协程的描述,下面来自官方文档:

协程通过将复杂性放入库来简化异步编程。程序的逻辑可以在协程中顺序地表达,而底层库会为我们解决其异步性。该库可以将用户代码的相关部分包装为回调、订阅相关事件、在不同线程(甚至不同机器)上调度执行,而代码则保持如同顺序执行一样简单

协程的开发人员 Roman Elizarov 是这样描述协程的:协程就像非常轻量级的线程。线程是由系统调度的,线程切换或线程阻塞的开销都比较大。而协程依赖于线程,但是协程挂起时不需要阻塞线程,几乎是无代价的,协程是由开发者控制的。所以协程也像用户态的线程,非常轻量级,一个线程中可以创建任意个协程

总结下,协程是跑在线程上的,一个线程可以同时跑多个协程,每一个协程代表一个异步任务,我们通过全手动来控制多个线程之间的运行,决定谁什么时候挂起,什么时候运行,什么时候唤醒,而不是 Thread 那样交给系统内核来操作,去竞争 CPU 时间段

协程在线程中是顺序执行的,既然是顺序执行的那怎么实现异步的效果,这自然是有手段的,Thread 中我们有阻塞,唤醒的概念,协程同样也有,挂起等同于阻塞,区别是 Thread 的阻塞是阻塞当前线程的(此时线程只能空耗 cpu 时间而不能执行计算任务,是种浪费),而协程的阻塞可以不阻塞线程,线程在正在执行人物的协程阻塞时,会继续计算任务,执行下面的协程,通过这样的手段来实现多线程,异步的效果,在思维逻辑上同 Thread 有较大区别,需要适应思路

suspend 这个关键字表示可挂起,kotlin 的协程中的部分函数要求锁调用的方法是可以挂起的,必须用 suspend 关键字修饰这个方法,被 suspend 修饰的方法只能在协程中使用或是另一个 suspend 修饰的方法

suspend fun requestToken(): Token { ... }   // 挂起函数
suspend fun createPost(token: Token, item: Item): Post { ... }  // 挂起函数

fun postItem(item: Item) {
    GlobalScope.launch { // 创建一个新协程
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
        // 需要异常处理,直接加上 try/catch 语句即可
    }
}

2. kotlin 协程创建 API

kotlin 里没有 new ,自然也不像 JAVA 一样 new Thread,另外 kotlin 里面提供了大量的高阶函数,所以不难猜出协程这里 kotlin 也是有提供专用函数。kotlin 中 GlobalScope 类提供了几个携程构造函数:

kotlin 在 1.3 之后要求协程必须由 CoroutineScope 创建,CoroutineScope 不阻塞当前线程,在后台创建一个新协程,也可以指定协程调度器。比如 CoroutineScope.launch{} 可以看成 new Thread,就是这个意思,区别是一个是协程,一个是线程

来看一个最简单的例子:

    Log.d("AA", "协程初始化开始,时间: " + System.currentTimeMillis())
    GlobalScope.launch(Dispatchers.Unconfined) {
        Log.d("AA", "协程初始化完成,时间: " + System.currentTimeMillis())
        for (i in 1..3) {
            Log.d("AA", "协程任务1打印第$i 次,时间: " + System.currentTimeMillis())
        }
        delay(500)
        for (i in 1..3) {
            Log.d("AA", "协程任务2打印第$i 次,时间: " + System.currentTimeMillis())
        }
    }
    Log.d("AA", "主线程 sleep ,时间: " + System.currentTimeMillis())
    Thread.sleep(1000)
    Log.d("AA", "主线程运行,时间: " + System.currentTimeMillis())
    for (i in 1..3) {
        Log.d("AA", "主线程打印第$i 次,时间: " + System.currentTimeMillis())
    }
03-28 14:00:16.028 5436-5436/com.bloodcrown.kotlintest D/AA: 协程初始化开始,时间: 1553752816027
03-28 14:00:16.060 5436-5436/com.bloodcrown.kotlintest D/AA: 协程初始化完成,时间: 1553752816060
03-28 14:00:16.060 5436-5436/com.bloodcrown.kotlintest D/AA: 协程任务1打印第1 次,时间: 1553752816060
03-28 14:00:16.060 5436-5436/com.bloodcrown.kotlintest D/AA: 协程任务1打印第2 次,时间: 1553752816060
03-28 14:00:16.060 5436-5436/com.bloodcrown.kotlintest D/AA: 协程任务1打印第3 次,时间: 1553752816060
03-28 14:00:16.063 5436-5436/com.bloodcrown.kotlintest D/AA: 主线程 sleep ,时间: 1553752816063
03-28 14:00:16.567 5436-5519/com.bloodcrown.kotlintest D/AA: 协程任务2打印第1 次,时间: 1553752816567
03-28 14:00:16.567 5436-5519/com.bloodcrown.kotlintest D/AA: 协程任务2打印第2 次,时间: 1553752816567
03-28 14:00:16.568 5436-5519/com.bloodcrown.kotlintest D/AA: 协程任务2打印第3 次,时间: 1553752816567
03-28 14:00:17.068 5436-5436/com.bloodcrown.kotlintest D/AA: 主线程运行,时间: 1553752817067
03-28 14:00:17.068 5436-5436/com.bloodcrown.kotlintest D/AA: 主线程打印第1 次,时间: 1553752817068
03-28 14:00:17.068 5436-5436/com.bloodcrown.kotlintest D/AA: 主线程打印第2 次,时间: 1553752817068
03-28 14:00:17.068 5436-5436/com.bloodcrown.kotlintest D/AA: 主线程打印第3 次,时间: 1553752817068
这几个函数基本点都一样,我们以 launch 函数为例
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

可以看到啊 launch 是个扩展函数,接受3个参数,前面2个是常规参数,最后一个是个对象式函数,这样的话 kotlin 就可以使用以前说的闭包的写法:() 里面写常规参数,{} 里面写函数式对象的实现,就像上面的例子一样,刚从 java 转过来的朋友看着很别扭不是,得适应

 GlobalScope.launch(Dispatchers.Unconfined) {...}

我们需要关心的是 launch 的3个参数和返回值 Job

            val singleThreadContext = newSingleThreadContext("aa")
            GlobalScope.launch(singleThreadContext) { ... }
var job:Job = GlobalScope.launch( start = CoroutineStart.LAZY ){
    Log.d("AA", "协程开始运行,时间: " + System.currentTimeMillis())
}

Thread.sleep( 1000L )
// 手动启动协程
job.start()

3. GlobalScope.async

async 同 launch 唯一的却别就是 async 是可以有返回的,看下面的例子:

            GlobalScope.launch(Dispatchers.Unconfined) {
                val deferred = GlobalScope.async{
                    delay(1000L)
                    Log.d("AA","This is async ")
                    return@async "taonce"
                }
                Log.d("AA","协程 other start")
                val result = deferred.await()
                Log.d("AA","async result is $result")
                Log.d("AA","协程 other end ")
            }
            Log.d("AA", "主线程位于协程之后的代码执行,时间:  ${System.currentTimeMillis()}")

async 返回的是 Deferred 类型,Deferred 继承自 Job 接口,Job有的它都有,增加了一个方法 await ,这个方法接收的是 async 闭包中返回的值,async 的特点是不会阻塞当前线程,但会阻塞所在协程,也就是挂起

但是注意啊,async 并不会阻塞线程,只是阻塞锁调用的协程

4. runBlocking

runBlocking 和 launch 区别的地方就是 runBlocking 的 delay 方法是可以阻塞当前的线程的,和Thread.sleep() 一样,看下面的例子:

  fun main(args: Array<String>) {
      runBlocking {
          // 阻塞1s
          delay(1000L)
          println("This is a coroutines ${TimeUtil.getTimeDetail()}")
      }
      // 阻塞2s
      Thread.sleep(2000L)
      println("main end ${TimeUtil.getTimeDetail()}")
  }
  // 输出
  // This is a coroutines 11:00:51
  // main end 11:00:53

runBlocking 通常的用法是用来桥接普通阻塞代码和挂起风格的非阻塞代码,在r unBlocking 闭包里面启动另外的协程,协程里面是可以嵌套启动别的协程的


协程的挂起和恢复

前面说过,协程的特点就是多个协程可以运行在一个线程内,单个协程在进入 IO 阻塞时不会阻塞当前线程,线程还可以继续执行其他任务。学习协程最大的难点是搞协程是如何运行的,合适挂起,合适恢复,多个协程之间的组织运行,协程和线程之间的组织运行

1. 协程在不挂起时, 协程和协程,协程和线程内代码是顺序运行的

这点是和 thread 最大的不同,thread 线程采取竞争 cpu 时间段的方法运行,谁抢到谁运行,由系统内核控制,对我们是不可见不可控的。协程不同,协程之间不用竞争,谁运行,谁挂起,什么时候恢复都是由我们自己控制的

最简单的协程运行模式,不涉及挂起时,谁写在前面谁先运行,后面的等前面的协程运行完之后再运行。涉及到挂起时,前面的协程挂起了,那么线程不会空闲,而是继续运行下一个协程,而前面挂起的那个协程在挂起结速时不会马上运行,而是等待当前正在运行的协程运行完毕后再去执行

下面这个就是协程顺序执行的例子:

            GlobalScope.launch(Dispatchers.Unconfined) {
                for (i in 1..6) {
                    Log.d("AA", "协程任务打印第$i 次,时间: ${System.currentTimeMillis()}")
                }
            }
            for (i in 1..8) {
                Log.d("AA", "主线程打印第$i 次,时间:  ${System.currentTimeMillis()}")
            }

2. 协程内部碰到挂起时,协程就不会执行了,等待挂起完成才能继续后面的部分

大家还记得 suspend 这个关键字吗,suspend 表示挂起的意思,用来修饰方法的,一个协程内有多个 suspend 修饰的方法顺序书写时,代码也是顺序运行的,为什么,suspend 函数会将整个协程挂起,而不仅仅是这个 suspend 函数

单携程内多 suspend 函数运行

    suspend fun getToken(): String {
        delay(300)
        Log.d("AA", "getToken 开始执行,时间:  ${System.currentTimeMillis()}")
        return "ask"
    }

    suspend fun getResponse(token: String): String {
        delay(100)
        Log.d("AA", "getResponse 开始执行,时间:  ${System.currentTimeMillis()}")
        return "response"
    }

    fun setText(response: String) {
        Log.d("AA", "setText 执行,时间:  ${System.currentTimeMillis()}")
    }

// 运行时
    GlobalScope.launch(Dispatchers.Main) {
        Log.d("AA", "协程 开始执行,时间:  ${System.currentTimeMillis()}")
        val token = getToken()
        val response = getResponse(token)
        setText(response)
    }

在 getToken 方法将协程挂起时,getResponse 函数永远不会运行,只有等 getToken 挂起结速将协程恢复时才会运行

多协程间 suspend 函数运行

            GlobalScope.launch(Dispatchers.Unconfined){
                var token = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getToken()
                }.await()

                var response = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getResponse(token)
                }.await()

                setText(response)
            }

注意我外面要包裹一层 GlobalScope.launch,要不运行不了。哈哈 suspend 虽然把所在挂起了,但是我们在这里使用的是多协程,一个协程挂起了,但是还有别的协程呢,所以大家请注意 await 函数,await 会挂起(阻塞)最外层协程(GlobalScope.launch 这段),这样代码还是顺序执行的,这适用于我们有多个 IO 操作的时候,这比 rxjava 要省事不少啊

3. 协程挂起时何时恢复

这个问题值得我们研究,毕竟代码运行是负载的,协程之外线程里肯定还有需要执行的代码,我们来看看前面的协程在挂起然后合适才能恢复执行

我们把上面的方法延迟改成 1ms ,2ms

            GlobalScope.launch(Dispatchers.Unconfined) {
                Log.d("AA", "协程 开始执行,时间:  ${System.currentTimeMillis()}")
                val token = getToken()
                val response = getResponse(token)
                setText(response)
            }

            for (i in 1..10) {
                Log.d("AA", "主线程打印第$i 次,时间:  ${System.currentTimeMillis()}")
            }

协程在挂起后,虽然延迟的时间到了,但是还得等到 线程空闲时 才能继续执行,这里要注意,协程可没有竞争 cpu 时间段,协程挂起后即便 IO 阻塞结速了也不是马上就能恢复执行,需要我们自己控制,这里写不好是要出问题的

4. 协程挂起后再恢复时在哪个线程运行

为什么要写这个呢,在 Thread 中不存在这个问题,但是协程中有句话这样说的:哪个线程恢复的协程,协程就运行在哪个线程中,我们分别对 kotlin 提供的 3个协程调度器测试一下

我们用这段代码测试,分别设置 3个协程调度器

            GlobalScope.launch(Dispatchers.Main){
                Log.d("AA", "协程测试 开始执行,线程:${Thread.currentThread().name}")
                var token = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getToken()
                }.await()

                var response = GlobalScope.async(Dispatchers.Unconfined) {
                    return@async getResponse(token)
                }.await()

                setText(response)
            }
            Log.d("AA", "主线程协程后面代码执行,线程:${Thread.currentThread().name}")

注意协程内部,若是在前面有代码切换了线程,后面的代码若是没有指定线程,那么就是运行在这个切换到的线程上的,所以大家看上面的测试结果,setText 执行的线程都和上一个方法一样

我们最好给异步任务在外面套一个协程,这样我们可以挂挂起整个异步任务,然后给每段代码指定运行线程调度器,这样省的因为协程内部挂起恢复变更线程而带来的问题

Kotlin Coroutines封装异步回调、协程间关系及协程的取消 一文中,作者分析了源码,非 Dispatchers.Main 调度器的协程,会在协程挂起后把协程当做一个任务 DelayedResumeTask 放到默认线程池 DefaultExecutor 队列的最后,在延迟的时间到达才会执行恢复协程任务。虽然多个协程之间可能不是在同一个线程上运行的,但是协程内部的机制可以保证我们书写的协程是按照我们指定的顺序或者逻辑自行

看个例子:

    suspend fun getToken(): String {
        Log.d("AA", "getToken start,线程:${Thread.currentThread().name}")
        delay(100)
        Log.d("AA", "getToken end,线程:${Thread.currentThread().name}")
        return "ask"
    }

    suspend fun getResponse(token: String): String {
        Log.d("AA", "getResponse start,线程:${Thread.currentThread().name}")
        delay(200)
        Log.d("AA", "getResponse end,线程:${Thread.currentThread().name}")
        return "response"
    }

    fun setText(response: String) {
        Log.d("AA", "setText 执行,线程:${Thread.currentThread().name}")
    }

    // 实际运行
    GlobalScope.launch(Dispatchers.IO) {
        Log.d("AA", "协程测试 开始执行,线程:${Thread.currentThread().name}")
        var token = GlobalScope.async(Dispatchers.IO) {
            return@async getToken()
        }.await()

        var response = GlobalScope.async(Dispatchers.IO) {
            return@async getResponse(token)
        }.await()

        setText(response)
    }

5. relay,yield 的区别

relay 和 yield 方法是协程内部的操作,可以挂起协程,区别是 relay 是挂起协程并经过执行时间恢复协程,当线程空闲时就会运行协程;yield 是挂起协程,让协程放弃本次 cpu 执行机会让给别的协程,当线程空闲时再次运行协程。我们只要使用 kotlin 提供的协程上下文类型,线程池是有多个线程的,再次执行的机会很快就会有的。

除了 main 类型,协程在挂起后都会封装成任务放到协程默认线程池的任务队列里去,有延迟时间的在时间过后会放到队列里去,没有延迟时间的直接放到队列里去

6. 协程的取消

我们在创建协程过后可以接受一个 Job 类型的返回值,我们操作 job 可以取消协程任务,job.cancel 就可以了

        // 协程任务
        job = GlobalScope.launch(Dispatchers.IO) {
            Log.d("AA", "协程测试 开始执行,线程:${Thread.currentThread().name}")
            var token = GlobalScope.async(Dispatchers.IO) {
                return@async getToken()
            }.await()

            var response = GlobalScope.async(Dispatchers.IO) {
                return@async getResponse(token)
            }.await()

            setText(response)
        }
      
        // 取消协程
        job?.cancel()
        Log.d("AA", "btn_right 结束协程")

协程的取消有些特质,因为协程内部可以在创建协程的,这样的协程组织关系可以称为父协程,子协程:

现在问题来了,在 Thread 中我们想关闭线程有时候也不是掉个方法就行的,需要我们自行在线程中判断县城是不是已经结束了。在协程中一样,cancel 方法只是修改了协程的状态,在协程自身的方法比如 realy,yield 等中会判断协程的状态从而结束协程,但是若是在协程我们没有用这几个方法怎么办,比如都是逻辑代码,这时就要我们自己手动判断了,使用 job.isActive ,isActive 是个标记,用来检查协程状态


其他内容

我也是初次学习使用协程,这里放一些暂时没高彻底的内容

1. Mutex 协程互斥锁

线程中锁都是阻塞式,在没有获取锁时无法执行其他逻辑,而协程可以通过挂起函数解决这个,没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞可以执行其他逻辑。这种互斥锁就是 Mutex,它与 synchronized 关键字有些类似,还提供了 withLock 扩展函数,替代常用的 mutex.lock; try {...} finally { mutex.unlock() }

更多的使用经验就要大家自己取找找了

fun main(args: Array<String>) = runBlocking<Unit> {
    val mutex = Mutex()
    var counter = 0
    repeat(10000) {
        GlobalScope.launch {
            mutex.withLock {
                counter ++
            }
        }
    }
    println("The final count is $counter")
}

协程应用

1. 协程请求网络数据

我们用带返回值的协程 GlobalScope.async 在 IO 线程中去执行网络请求,然后通过 await 返回请求结果,用launch 在主线程中更新UI就行了,注意外面用 runBlocking 包裹

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        coroutine.setOnClickListener { click() }
    }

    private fun click() = runBlocking {
        GlobalScope.launch(Dispatchers.Main) {
            coroutine.text = GlobalScope.async(Dispatchers.IO) {
                // 比如进行了网络请求
                // 放回了请求后的结构
                return@async "main"
            }.await()
        }
    }
}

我们以 launch 函数为例

上一篇 下一篇

猜你喜欢

热点阅读