Android Kotlin(8)之《协程3》
2017-08-10 本文已影响1588人
小强彬
Android Kotlin第八篇 协程3。Kotlin系列源码在源码下载这里下载。我们一起来了解下Kotlin的协程,协程也是Kotlin重点。也许我有的地方没有写好,也欢迎大家提出问题,纠正问题。
前面我们已经简单了解了协程,这篇我们来看下协程的调度线程与Channels(通道)
一、调度线程
在前面我们经常用到“CommonPool”共享的线程池,那么除了共享的线程池以为还有哪些呢,如下:
fun test21() = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
log(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
log("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
log(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
log(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
- Unconfined:无限制的,自由的,不限于任何特定线程
- coroutineContext:返回这个coroutine的上下文。
- CommonPool:共享线程的公共池,类似java里线程池,只不过这里是公共的
-
newSingleThreadContext:创建一个单独的线程,在作业被取消时回收
目前线程调度还未完全搞懂,只了解了部分,不敢妄加猜测,调度线程就先了解到这里,后续我在补充
二、Channels(通道)
Channels,它不是一个阻止放操作,而是一个挂起发送,而不是一个阻塞操作,它有一个暂停接收。可用于线程间传递数据。
我们先来看一个简单的示例:
fun test22() = runBlocking<Unit> {
//定义一个通道
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) {
delay(1000L)
//在这里发送
channel.send(x * x)
}
}
repeat(5) {
//在这里接收
log(channel.receive().toString())
}
log("Done!")
}
输出:
08-10 14:48:52.452 10568-10568/com.xiaoqiang.kotlin I/test: 1
08-10 14:48:53.452 10568-10568/com.xiaoqiang.kotlin I/test: 4
08-10 14:48:54.452 10568-10568/com.xiaoqiang.kotlin I/test: 9
08-10 14:48:55.452 10568-10568/com.xiaoqiang.kotlin I/test: 16
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 25
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: Done!
08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 结束
1、关闭通道
假设发送的时候,遇到特殊情况,不在发生了,要关闭通道,怎么做呢,例如:
fun test23() = runBlocking<Unit> {
//定义一个通道
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) {
delay(1000L)
//在这里发送
channel.send(x * x)
if(x == 4){
log("关闭通道")
channel.close()
}
}
}
repeat(5) {
//在这里接收
try {
var a = channel.receive()
log(a.toString())
}catch (e: ClosedReceiveChannelException){
log("关闭通道报出异常ClosedReceiveChannelException")
}
log("等待接收")
}
log("Done!")
}
注意:我实际测试的时候发现,如果不抛出ClosedReceiveChannelException异常,那么会导致程序直接奔溃,所有这里你只需要在接收方抛出异常即可
我们也可把通道写成方法,实际测试发现方法里结束通道时在接收方可以不写抛出异常,例如:
fun produceSquares() = produce<Int>(CommonPool) {
for (x in 1..5) {
delay(1000L)
send(x * x)
if (x == 4){
channel.close()
}
}
}
fun test24() = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { log(it.toString()) }
log("Done!")
}
2、Pipelines
在这里我理解为管道运输,也就是说上面produce生成通道是可以两个链接起来,每块做不同的工作,然后输出,例如:
fun produceSquares1() = produce<Int>(CommonPool) {
for (x in 1..5) {
delay(1000L)//这里延迟就失效了,具体原因不清楚为啥会失效
send(x)
}
}
fun produceSquares2(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
for (x in 1..5) {
delay(1000L)
send((x * x)+1)
}
}
fun test25() = runBlocking<Unit> {
//始发地
val numbers = produceSquares1()
//通道二次加工
val squares = produceSquares2(numbers)
//最后输出
squares.consumeEach { log(it.toString()) }
log("Done!")
//关闭通道,回收
numbers.cancel()
squares.cancel()
}
输出:
08-10 14:56:31.742 19083-19083/com.xiaoqiang.kotlin I/test: 2
08-10 14:56:32.742 19083-19083/com.xiaoqiang.kotlin I/test: 5
08-10 14:56:33.742 19083-19083/com.xiaoqiang.kotlin I/test: 10
08-10 14:56:34.742 19083-19083/com.xiaoqiang.kotlin I/test: 17
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 26
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: Done!
08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 结束
3、Fan-out
多个coroutines 可以从一个通道接收,例如:
fun produceNumbers() = produce<Int>(CommonPool) {
var a = 1
while (true){
delay(100L)
send(a++)
}
}
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
channel.consumeEach {
log("launch#$id:收到: $it")
}
}
fun test26() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
输出:
08-10 15:01:07.228 24523-24563/? I/test: launch#0:收到: 1
08-10 15:01:07.338 24523-24563/? I/test: launch#1:收到: 2
08-10 15:01:07.438 24523-24562/? I/test: launch#2:收到: 3
08-10 15:01:07.538 24523-24563/? I/test: launch#4:收到: 4
08-10 15:01:07.638 24523-24562/? I/test: launch#3:收到: 5
08-10 15:01:07.738 24523-24563/? I/test: launch#0:收到: 6
08-10 15:01:07.838 24523-24563/? I/test: launch#1:收到: 7
08-10 15:01:07.938 24523-24562/? I/test: launch#2:收到: 8
08-10 15:01:08.038 24523-24563/com.xiaoqiang.kotlin I/test: launch#4:收到: 9
08-10 15:01:08.088 24523-24523/com.xiaoqiang.kotlin I/test: 结束
4、Fan-in
多个coroutines 可以发送到一个通道,例如
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
fun test27() = runBlocking<Unit> {
val channel = Channel<String>()
//这里两个launch间隔着向通道发送数据
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) {
//循环接收数据
log(channel.receive())
}
}
输出:
08-10 14:58:46.088 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.288 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.388 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
08-10 14:58:46.488 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.688 21748-21748/com.xiaoqiang.kotlin I/test: foo
08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: 结束
5、通道缓冲区
到目前为止所显示的通道没有缓冲区。当发送方和接收方相遇(又称会合)时,未缓冲通道传输元素。如果首先调用send,那么它将被挂起,直到被调用接收,如果首先调用接收,它将暂停,直到调用send。例如:
fun test28() = runBlocking<Unit> {
//创建缓冲区大小为4的通道
val channel = Channel<Int>(2)
launch(CommonPool) {
repeat(5) {
log("Sending $it")
channel.send(it) //当通道缓冲区满的时候挂起,直到下方接收的时候在继续
delay(100L)
}
}
delay(3000)
log("开始接收")
launch(CommonPool) {
repeat(5) {
val a= channel.receive()//开始接收通道缓冲区里数据,上方发送被激活,继续发送
log("Receive $a")
}
}
}
输出:
08-10 18:29:07.920 21685-21744/com.xiaoqiang.kotlin I/test: Sending 0
08-10 18:29:08.020 21685-21744/com.xiaoqiang.kotlin I/test: Sending 1
08-10 18:29:08.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 2
08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 开始接收
08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 结束
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 0
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 1
08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 2
08-10 18:29:11.030 21685-21744/com.xiaoqiang.kotlin I/test: Sending 3
08-10 18:29:11.030 21685-21745/com.xiaoqiang.kotlin I/test: Receive 3
08-10 18:29:11.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 4
08-10 18:29:11.130 21685-21745/com.xiaoqiang.kotlin I/test: Receive 4
如果我有未写好的地方,欢迎大家提出意见,谢谢大家的观赏!
全套源码下载这里源码会随着后面发布的Kotlin逐渐完善
a