kotlin coroutines 协程教程(一) 介绍及关键A
kotlin coroutines 协程
Coroutine 协程,是kotlin 上的一个轻量级的线程库,对比 java 的 Executor,主要有以下特点:
- 更轻量级的 api 实现协程
- async 和 await 不作为标准库的一部分
- suspend 函数,也就是挂起函数是比 java future 和 promise 更安全并且更容易使用
那么实际本质上和线程池有什么区别呢?我的理解是这样的,协程是在用户态对线程进行管理的,不同于线程池,协程进一步管理了不同协程切换的上下文,协程间的通信,协程挂起,对于线程挂起,粒度更小,而且一般不会直接占用到CPU 资源,所以在编程发展的过程中,广义上可以认为 多进程->多线程->协程。
协程并不会映射成内核线程或者其他这么重的资源,它的调度在用户态就可以搞定,任务之间的调度并非抢占式,而是协作式的。
简单使用
首先,要引入 coroutines 的依赖,在你的 build.gradle
dependencies {
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.0'
}
然后下面是一个最简单的例子,在子线程延迟打印一行日志:
fun coroTest() {
// Globals 是 Coroutines 的一个 builder
GlobalScope.launch {
delay(1000L)//Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
Thread.sleep(2000)
Log.i(CO_TAG, "launch ")
}
Log.i(CO_TAG, "----")
}
控制台输出效果如下:
01-05 11:11:40.373 28131-28131/com.yy.yylite.kotlinshare I/coroutine: ----
01-05 11:11:43.375 28131-3159/com.yy.yylite.kotlinshare I/coroutine: launch
也就是说在子线程 delay 1000 毫秒,然后 sleep 2000 毫秒,之后打印出来了,确实达到了我们理想的状态。
接着看下 Android studio 的 cpu profiler,我们可以看到,这里启动了几个新的子线程:
image这里会创建名为DefaultDispatch 的子线程,做个一个简单的实验,不断的重复执行上面的代码,并不会无限创建子线程,看了其内部的线程数也是有约束的。这个可能比直接调度线程池,更加节省资源,也避免了极端情况。(实际上 delay() 是一个非阻塞的挂起函数)
blocking 和 non-blocking 函数
delay{} 是 非阻塞函数,Thread.sleep() 则是阻塞函数,coroutines 中使用 runBlocking{} 作为阻塞函数,例如以下代码:
fun testBlockAndNoBlock() {
//非阻塞,子线程
GlobalScope.launch {
delay(1000)
doLog("no-block")
}
doLog("non block test")
//会阻塞主线程
runBlocking {
delay(3000)
doLog("block")
}
doLog("block test")
}
则控制台输出结果为:
01-05 15:27:37.982 20264-20264/com.yy.yylite.kotlinshare I/coroutine: non block test
01-05 15:27:38.984 20264-20312/com.yy.yylite.kotlinshare I/coroutine: no-block
01-05 15:27:40.983 20264-20264/com.yy.yylite.kotlinshare I/coroutine: block
block test
结果就是使用 runBlocking 会阻塞主线程,那么这个在实际开发中有任何用途吗?实际上,runBlocking{} 不是直接用在协程中的,常常用于桥接一些挂起函数操作,用于顶底函数或者Junit Test中,例如如下代码:
fun testBlock() = runBlocking {
val job= launch {
delay(1000)
doLog("in run block")
}
job.join()
}
这里将 join() 和 launch{} 进行桥接,使他们能够在一个地方执行。
等待
上面提到了可以通过 delay() 来等待一个函数执行,并且是非阻塞的,coroutines 中也提供了另一种等待机制,简单的例子如下:
fun testWaitJob() {
val job = GlobalScope.launch {
delay(2000)
doLog("waite")
}
doLog("main doing")
GlobalScope.launch {
job.join()
doLog("really excute")
}
}
最终输出结果如下,也就是 join()方法
01-05 21:45:53.472 13230-13230/com.yy.yylite.kotlinshare I/coroutine: main doing
01-05 21:45:55.475 13230-13803/com.yy.yylite.kotlinshare I/coroutine: waite
01-05 21:45:55.476 13230-13803/com.yy.yylite.kotlinshare I/coroutine: really excute
任务取消
某个场景下,你开启了一个协程,但是因为一些原因,你要取消这个协程,那么你可以这样处理,使用一下 Job.cancel() 方法取消协程,如下例子:
fun testCancel2() {
doLog("test cancel")
val job = GlobalScope.launch {
for (index in 1..30) {
doLog("print $index")
delay(100)
}
}
doLog("no waite repeat")
GlobalScope.launch {
delay(1000)
doLog("cancel ")
job.cancel()
}
}
控制台输出如下,也就是表示通过 job.cancel() 将执行的协程取消了。
01-05 21:52:20.684 17521-17521/com.yy.yylite.kotlinshare I/coroutine: test cancel
01-05 21:52:20.696 17521-17521/com.yy.yylite.kotlinshare I/coroutine: no waite repeat
01-05 21:52:20.698 17521-17784/com.yy.yylite.kotlinshare I/coroutine: print 1
01-05 21:52:20.801 17521-17788/com.yy.yylite.kotlinshare I/coroutine: print 2
01-05 21:52:20.901 17521-17785/com.yy.yylite.kotlinshare I/coroutine: print 3
01-05 21:52:21.002 17521-17787/com.yy.yylite.kotlinshare I/coroutine: print 4
01-05 21:52:21.105 17521-17787/com.yy.yylite.kotlinshare I/coroutine: print 5
01-05 21:52:21.219 17521-17793/com.yy.yylite.kotlinshare I/coroutine: print 6
01-05 21:52:21.320 17521-17785/com.yy.yylite.kotlinshare I/coroutine: print 7
01-05 21:52:21.421 17521-17788/com.yy.yylite.kotlinshare I/coroutine: print 8
01-05 21:52:21.522 17521-17786/com.yy.yylite.kotlinshare I/coroutine: print 9
01-05 21:52:21.623 17521-17793/com.yy.yylite.kotlinshare I/coroutine: print 10
01-05 21:52:21.706 17521-17799/com.yy.yylite.kotlinshare I/coroutine: cancel
但是实际上, Job 的状态分为以下几种情况:
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (optional initial state) | False | False | False |
Active(default initial state) | True | False | False |
Completing(transient state) | True | False | False |
Cancelling(transient state) | False | False | True |
Cancelled(final state) | False | True | True |
Completed(final state) | False | True | False |
那么一个 job 的执行流程如下:
* +-----+ start +--------+ complete +-------------+ finish +-----------+
* | New | -----> | Active | ---------> | Completing | -------> | Completed |
* +-----+ +--------+ +-------------+ +-----------+
* | cancel / fail |
* | +----------------+
* | |
* V V
* +------------+ finish +-----------+
* | Cancelling | --------------------------------> | Cancelled |
* +------------+ +-----------+
那么一个job 的状态根据执行过程,不断发生变化。其次,子job 和父job 相互关联,取消父job 会先依次取消子 job,同样子 Job 取消或者失败也会影响到父 Job 。
launch{} , runBlocking{} ,async{}
launch{} 会在当前线程开启一个新的协程,并且不会阻塞当前线程,同时会返回一个 Job 做为 coroutine 的引用,你可以通过这个 Job 取消对应的 Coroutine。
runBlocking {} 会在开启一个新的协程,并且阻塞当前进程,直到操作完成。这个函数不应该在协程里面使用,它是用来桥接需要阻塞的挂起函数,主要用于 main function 和 junit 测试。也就是说,runBolcking {} 必须用在最上层。
async{} 会在对应的 CoroutineContext 下创建一个新的协程,并且放回一个Deferred,通过 Deferred 可以异步获取结果,也就是调用Deffered 的 await() 方法。
先来看下三者的源码:
//launch
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
//runBlocking
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
//async
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
在 launch 里面会创建一个新的 CoroutineContext,如果没有传入 Context 则使用的 EmptyCoroutineContext,通过 newCoroutineContext() 函数会分配一个默认的 Dispatcher,也就是 Dispatcher.default,默认的全局 Dispatcher,会在jvm 层级共享线程池,会创建等于cpu 内核数目的线程(但是至少创建两个子线程)。接着判断 CoroutineStart 是否 Lazy 模式,如果 Lazy 模式,则该 Coroutine 不会立马执行,需要你主动掉了 Job.start() 之后才会执行。
协程 Channel
Channel 提供了一个简单传递的方式 在Coroutines 中传递一个简单的值,同时可以传递一连串的值。但是Channel 还是处于实验阶段,其 api 和使用方式可能随时改变。
channel 基本用法
Channel 类似于 BlockQueue,主要的区别是不同于 BlockQueue,其put() 方式提供阻塞,但是 Channel 提供了挂起的一个操作send(),同样的对应 BlockQueue 的 take() 操作,Channel 提供了一个挂起的 receive() 函数。
为什么说 BlockQuque 的put() 操作是阻塞的呢?
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
也就是说,如果调用 put() 函数的时候,队列容量超过了上限,则会一直的 block 住。
下面是 Channel send() 和 receive() 的一个简单例子:
fun testChannel1() {
val channel = Channel<Int>()
GlobalScope.launch {
for (i in 1..5) channel.send(i * i)
}
GlobalScope.launch {
repeat(5) {
doLog("receive ${channel.receive()}")
}
}
}
控制台输出如下:
01-06 10:20:08.703 8274-8621/com.yy.yylite.kotlinshare I/channel: receive 1
receive 4
01-06 10:20:08.714 8274-8620/com.yy.yylite.kotlinshare I/channel: receive 9
receive 16
01-06 10:20:08.715 8274-8620/com.yy.yylite.kotlinshare I/channel: receive 25
管道也提供了关闭方法 close() 你需要在所有send() 执行完毕之后,调用 close() 函数:
val channel = Channel<Int>()
GlobalScope.launch {
for (i in 1..5) channel.send(i * i)
channel.close()
}
调用 close() 函数之后,send() 操作会立马停止,只是意味着channel 不会处理新的 send(),之前发送的 send() b并不会被丢弃,同时 channel.isClosedForReceive 会立即放回 true,等待之前挂起的 send() 操作的结果,被 receive() 之后,channel.isClosedForReceive 才会返回 true。多次调用 close()是无效的,后续的调用都会返回 false,表示之前已经有人把该 channel close了。
回到我们创建 Channel 的代码,我们通过 Channel<E>() 创建一个 Channel,对应的源码如下:
//这是在 Channel.kt 里面的一个包级函数
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
我们可以看到,这里其实是有一个参数的,只不过默认值是 RENDEZVOUS,那么这里的区别是啥呢?
- RENDEZVOUS 默认模式,也就是 0,表示Channel 没有buffer,SendChannel 的 send() 方法会等待 ReceiveChannel 的 receive() 方法,反之,ReceiveChannel 的 receive() 方法也会等待SendChannel 的 send() 方法。
- UNLIMITED 也就是 Integer.Max,会创建 LinkedList 类型的无限量的 Channel队列
- CONFLATED 也就是 -1,会创建最多只有一个容量的 Channel 队列,也就是你receive 的肯定是,最新的一个 SendChannel send() 出来的。
- 其它大于0 的数值,则会创建对应长度的 buffer 数组的 Channel 队列
Channel 与生产者模式
你可能在业务场景中,碰到这样一种需求,某个列表需要请求多个接口,才能完成展示功能,并且这些接口可能是依赖的。
看下面的代码:
fun testChannelProduce() {
val square = GlobalScope.produceSquares()
GlobalScope.launch {
square.consumeEach {
doLog("Consume $it")
}
}
}
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (i in 1..5) send(i * i)
}
其实这里关注 produce() 方法,它是 GlobalScope 的一个扩展方法,其源码如下:
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine
}
也就是说,produce() 函数最终返回一个ReceiveChannel,然后通过ReceiveChannel 的扩展函数,consumeEach() 去遍历这个 Channel。
Fan in 和 Fan out
fan out 也就是多个 Coroutine 会接收同一个 Channel,进行传递信息,简单的 demo 如下:
fun testFadInProduce() = runBlocking {
val number = fadInProduce()
repeat(5) {
fadInConsume(it, number)
}
delay(1000)//单次执行delay 了100,所以这里会执行10次
number.cancel()
}
fun CoroutineScope.fadInProduce(): ReceiveChannel<Int> = produce {
var start = 1
while (true) {
send(start++)
delay(100)
}
}
fun CoroutineScope.fadInConsume(id: Int, channel: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (msg in channel) doLog("process id:$id msg:$msg")
}
然后控制台输出如下:
2019-02-07 17:12:46.777 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:0 msg:1
2019-02-07 17:12:46.878 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:0 msg:2
2019-02-07 17:12:46.978 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:1 msg:3
2019-02-07 17:12:47.079 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:2 msg:4
2019-02-07 17:12:47.179 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:3 msg:5
2019-02-07 17:12:47.280 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:4 msg:6
2019-02-07 17:12:47.380 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:0 msg:7
2019-02-07 17:12:47.480 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:1 msg:8
2019-02-07 17:12:47.581 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:2 msg:9
2019-02-07 17:12:47.681 27720-27720/com.yy.yylite.kotlinshare I/channel: process id:3 msg:10
在这个案例中,
相反在 fan-in 指的是,多个 Coroutine 会使用同一个 Channel 发送消息,案例如下:
fun testFanOut() = runBlocking {
val channel = Channel<String>()
launch { fanout(channel, "hello", 200) }
launch { fanout(channel, "world", 500) }
repeat(10) {
doLog("fanout-:${channel.receive()}")
}
coroutineContext.cancelChildren()
}
suspend fun fanout(channel: SendChannel<String>, msg: String, delayMillis: Long) {
while (true) {
delay(delayMillis)
channel.send(msg)
}
}
控制台输出如下:
2019-02-07 17:28:17.356 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:17.556 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:17.656 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:world
2019-02-07 17:28:17.757 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:17.957 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:18.156 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:world
2019-02-07 17:28:18.157 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:18.558 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:hello
2019-02-07 17:28:18.656 29726-29726/com.yy.yylite.kotlinshare I/channel: fanout-:world
Channel 一种更通用的场景
我们经常碰到的业务是,进入一个 Activity,需要请求一个接口,但是如果立马退出 Activity 或者 Activity 进入后台,我们这个 request 需要做相应的操作,例如取消或者暂停请求。
如果使用传统的 java 多线程,我们需要通过设置一些标记位或者通过接口回调去实现这个逻辑,如果使用 Coroutine 呢?
我们通过一个简单的案例,来使用 Channel:
class ChatListModel(val callback: IChatListCallback) : IChatListModel, CoroutineScope {
val requestJob: Job = Job()
val sendChannel: Channel<List<ChatMsg>> = Channel()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + requestJob
override fun requestChatList(size: Int, page: Int) {
this.launch {
doLog("request")
delay(1000)
val list: ArrayList<ChatMsg> = ArrayList<ChatMsg>()
for (index in 1..size) {
val msg = ChatMsg("我是消息 ${index + page * size}", System.currentTimeMillis(), (index + page * size).toLong())
list.add(msg)
}
sendChannel.send(list)
}
getMsgListBack()
}
fun getMsgListBack() = runBlocking(context = Dispatchers.Main) {
val list = sendChannel.receive()
doLog("receive msg")
callback.requestListBack(1, list)
}
override fun sendMsg(content: String, uid: Long, sequelId: Long) {
}
private fun doLog(msg: String) {
Log.i("ChatListModel", msg)
}
override fun onStart() {
}
override fun onDestroy() {
requestJob.cancel()
sendChannel.close()
}
}
这里的场景是,模拟一个 聊天窗口的场景,进入页面之后,从数据库读取消息,然后返回到界面显示。
使用 async
async {} 是一种异步操作,会创建一个新的 coroutines,并且返回一个可以异步获取结果的Deferred,Deffered继承了 future。
简单的例子如下:
private suspend fun sunpendF1(): Int {
doLog("suspend fun 1 begin")
delay(1000)
doLog("suspend fun 1 end")
return 2
}
private suspend fun sunpendF2(): Int {
doLog("suspend fun 2 begin")
delay(1000)
doLog("suspend fun 2 end")
return 4
}
fun testAsync() {
val one = GlobalScope.launch {
val result1 = async {
sunpendF1()
}
val reslut2 = async {
sunpendF2()
}
doLog("result:${result1.await() + reslut2.await()}")
}
}
然后控制台输出如下:
01-08 19:47:37.763 25548-26803/com.yy.yylite.kotlinshare I/suspend: suspend fun 2 begin
01-08 19:47:37.764 25548-26802/com.yy.yylite.kotlinshare I/suspend: suspend fun 1 begin
01-08 19:47:38.766 25548-26803/com.yy.yylite.kotlinshare I/suspend: suspend fun 2 end
01-08 19:47:38.766 25548-26804/com.yy.yylite.kotlinshare I/suspend: suspend fun 1 end
01-08 19:47:38.770 25548-26804/com.yy.yylite.kotlinshare I/suspend: result:6
所以我们可以看到,两个