KotlinKotlin-Coroutines

深入学习Kotlin之Flow(一),什么是Flow?Flow的

2020-11-11  本文已影响0人  不思进取的码农

目录

深入学习Kotlin之Flow(一),什么是Flow?Flow的基本使用)
深入学习Kotlin之Flow(二),Flow的操作符,以及协程的背压

前言

在DataStore里面有提到 DataStore是基于协程Flow实现的,那么什么是Flow呢?

什么是Flow

Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步流
类似 RxJava 的 Observable 、 Flowable 等等,所以很多人都用 Flow 与 RxJava 做对比。

Flow 相比于 RxJava 简单的太多了,你还记得那些 RxJava 傻傻分不清楚的操作符吗 Observable 、 Flowable 、 Single 、 Completable 、 Maybe 等等。

Flow解决了什么问题

相比之下Flow有了很不错的优点:

  • Flow 支持线程切换、背压
  • Flow 入门的门槛很低,没有那么多傻傻分不清楚的操作符
  • 简单的数据转换与操作符,如 map 等等
  • Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性
  • 易于做单元测试
  • 解决回调地狱的问题

Flow的基本使用

Flow能够返回多个异步值
              
fun simple(): Flow<Int> = flow { // 流构建器
    for (i in 1..3) {
        delay(100) // 假装我们在这里做了一些有用的事情
        emit(i) // 发送下一个值
    }
}

fun main() = runBlocking<Unit> {
    // 启动并发的协程以验证主线程并未阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // 收集这个流
    simple().collect { value -> println(value) } 

打印结果:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

其中 Flow 接口,只有一个 collect 函数

如果熟悉 RxJava 的话,则可以理解为 collect() 对应subscribe(),而 emit() 对应onNext()。

创建 Flow

创建Flow有几种方式:

 flowOf(1,2,3,4,5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }
    listOf(1, 2, 3, 4, 5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }
 {
        for (i in 1..5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }

}

最后的 channelFlow builder 跟 flow builder 是有一定差异的。

flow 是 Cold Stream,在没有切换线程的情况下,生产者和消费者是同步非阻塞的。
channel 是 Hot Stream,而 channelFlow 实现了生产者和消费者异步非阻塞模型。
关于Cold StreamHot Stream我们后续会讲

切换线程

相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn,下面的例子中,展示了 flow builder 和 map 操作符都会受到 flowOn 的影响

  flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。

例如,下面的代码 collect() 则是在 main 线程:

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}

运行结果:

main: 1
main: 4
main: 9
main: 16
main: 25

flow 取消

如果 flow 是在协程被挂起了,那么 flow 是可以被取消的,否则不能取消。

fun main() = runBlocking {

    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

    println("Done")
}

运行结果:

1
2
Done

(每天学习一点点.每天进步一点点,分享不宜路过点个赞呀,喜欢的点个关注后续更新不断)

上一篇 下一篇

猜你喜欢

热点阅读