KotlinAndroid 技术文章

Kotlin Flow 介绍

2020-09-07  本文已影响0人  竖起大拇指

1.Kotlin Flow 介绍

Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。

A cold asynchronous data stream that sequentially emits values
 and completes normally or with an exception。

翻译下就是:按顺序发出值并正常完成或异常完成的Cold异步数据流。

2.flow使用

image.png

2.1 Flow的创建

  1. 可以使用flow构建函数构建一个Flow类型返回值的函数
  2. flow{}构建体中可以调用挂起函数,即上流
  3. 上流使用emit函数发射值
  4. 下流使用collect函数收集值
//上流函数
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        //下流接收数据
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("finished")
    }
}

结果:
1
2
3
finished

2.2 Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行。

fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
 
fun main() {
    runBlocking {
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect1 finished")
 
        simpleFlow().collect { value ->
            println(value)
        }
 
        println("collect2 finished")
    }
}

结果:
1
2
3
collect1 finished
1
2
3
collect2 finished

2.3 Flow的连续性

Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.filter {
            it % 2 == 0 //只取偶数
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}
结果:
String 2
String 4

2.4 Flow构建器

  1. flow{}
flow {
    (5 .. 10).forEach {
              emit(it)
         }
}.collect{
   println(it)
}
    
  1. flowOf() 帮助可变数组生成 Flow 实例
flowOf(1,2,3,4,5).collect { println(it) }

其实flowOf调用的就是第一种flow{},分别emit发送值,源码如下:

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
  1. asFlow() 面向数组、列表等集合
(5 ..10).asFlow().collect { 
                    println(it)
                }
消费数据

collect 方法和 RxJava 中的 subscribe 方法一样,都是用来消费数据的。
除了简单的用法外,这里有两个问题得注意一下:

3.切换线程

3.1切换线程使用的是flowOn操作符。

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

简单点理解就是flowOn之前的操作符运行在flowOn指定的线程之内,flowOn之后的操作符运行在整个flow运行的CoroutineContext内。

例如,下面的代码collect则是在main线程:

fun main() = runBlocking {
  flowOf(1,2,3,4,5)
                    .flowOn(Dispatchers.Default)
                    .collect { 
                    println(Thread.currentThread().name+" "+it) 
                }
}

打印如下:

main 1
main 2
main 3
main 4
main 5

3.2 除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文。

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主线程等待这个协程执行结束
    }
}
结果:
flow :DefaultDispatcher-worker-1
collect:DefaultDispatcher-worker-1 1
collect:DefaultDispatcher-worker-1 2
collect:DefaultDispatcher-worker-1 3
collect:DefaultDispatcher-worker-1 4
collect:DefaultDispatcher-worker-1 5

4.背压

上流每次发射耗时1s,下流接收耗时3s,那么它们总共会耗时多久

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow
                .collect {
                delay(3000)
                println("$it")
            }
        }
 
        println("time : $time ms")
    }
}
结果:
1
2
3
time : 12073 ms

可以看出,一般情况下,上下流执行是同步的.

4.1 使用buff,来让上流不等待下流接收,而是发射到缓存区

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow.buffer(50)//指定缓存区大小为50个
                .collect {
                delay(3000)
                println("$it")
            }
        }
 
        println("time : $time ms")
    }
}
结果:
1
2
3
time : 10158 ms
时间是1s + 3s * 3

4.2 有时我们不需要一个不漏的接收上流的元素时,可以使用conflate,下流来不及处理的会被丢弃掉

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow.conflate()
                .collect {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
结果:
1
3
time : 7124 ms

4.3 collectLast可以只接收上流发射的最后一个元素.

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
 
        val time = measureTimeMillis {
            flow
                .collectLatest {
                    delay(3000)
                    println("$it")
                }
        }
 
        println("time : $time ms")
    }
}
time : 6144 ms

5.Flow操作符

1.transform

在使用transform操作符时,可以任意多次调用emit。

runBlocking {

                (1..5).asFlow()
                    .transform {
                        emit(it * 2)
                        delay(100)
                        emit(it * 4)
                    }
                    .collect { println("transform:$it") }
            }

打印如下:

transform:2
transform:4
transform:4
transform:8
transform:6
transform:12
transform:8
transform:16
transform:10
transform:20
2.take

take操作符只取前几个emit发射。

  (1 .. 5).asFlow().take(2).collect {
                    println("take:$it")
                }

打印结果:

take:1
take:2
3.reduce
runBlocking {
                val sum=( 1 ..5).asFlow()
//                    .map {
//                    //println("map:${it}")
//                    it*it  }   //1,4,9,16,25

                    .reduce { a, b ->
                        println("reduce:${a},${b}")
                        a*b
                    }

                 println(sum)

            }

打印如下:

reduce:1,2
reduce:2,3
reduce:6,4
reduce:24,5
120

reduce理解起来稍微有点麻烦,我们看看源码实现加深理解:

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
    var accumulator: Any? = NULL

    collect { value ->
        accumulator = if (accumulator !== NULL) {
            @Suppress("UNCHECKED_CAST")
            operation(accumulator as S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    @Suppress("UNCHECKED_CAST")
    return accumulator as S
}

简单点理解就是两个元素操作之后拿到的值跟后面的元素进行操作,用于把flow 简化合并为一个值。

4.fold
runBlocking {
(1 ..5).asFlow().fold(2,{
                        a, b -> a * b
                })
}

5.zip

zip可以将2个flow进行合并的操作符

fun main() = runBlocking {

    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(100) }

    val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")
}

打印如下:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 540 ms

如果flowA中的item个数大于flowB中的item个数,执行合并后新flow的item个数=较小的flow的item个数。

6.flattenMerge/flattenConcat

flattenMerge不会组合多个flow,而是将它们作为单个流执行。

val flowA = (1..5).asFlow()
val flowB = flowOf("one", "two", "three", "four", "five")

  flowOf(flowA,flowB).flattenMerge(2).collect {
                    println("flattenMerge:$it")
                }

                flowOf(flowA,flowB).flattenConcat().collect{println("flattenConcat:$it")}

打印如下:

flattenMerge:1
flattenMerge:2
flattenMerge:3
flattenMerge:4
flattenMerge:5
flattenMerge:one
flattenMerge:two
flattenMerge:three
flattenMerge:four
flattenMerge:five

flattenConcat:1
flattenConcat:2
flattenConcat:3
flattenConcat:4
flattenConcat:5
flattenConcat:one
flattenConcat:two
flattenConcat:three
flattenConcat:four
flattenConcat:five

展平操作符

类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了

7.flatMapConcat

flatMapConcat由map,flattenMerge操作符联合完成。
源码如下:

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

测试代码:

fun currTime() = System.currentTimeMillis()

            var start: Long = 0
            runBlocking {

                (1..5).asFlow()
                    .onStart { start = currTime() }
                    .onEach { delay(100) }
                    .flatMapConcat {
                        flow {
                            emit("$it: First")
                            delay(500)
                            emit("$it: Second")
                        }
                        
                    }
                    .collect {
                        println("$it at ${System.currentTimeMillis() - start} ms from start")
                    }
            }

在调用 flatMapConcat 后,collect 函数在收集新值之前会等待 flatMapConcat 内部的 flow 完成
打印如下:

1: First at 124 ms from start
1: Second at 625 ms from start
2: First at 726 ms from start
2: Second at 1228 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
4: First at 1930 ms from start
4: Second at 2431 ms from start
5: First at 2532 ms from start
5: Second at 3033 ms from start
8.flatMapMerge

并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

可以看出来flatMapMerge并发特性:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
9.flatMapLatest

flatMapLatest和collectLatest操作符很像,只有新flow发射了新值,那么上个flow就会被取消。

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

打印如下:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
10.conflate

当一个flow表示操作的部分结果或者操作状态更新,它可能并不需要取处理每一个值,但是需要处理最近的一个值。在这种场景下,conflate操作符可以被用于忽略中间操作符。是一种对emit和collector慢处理的一种方式,它通过丢弃一些值来实现。

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

打印如下:

1
3
Collected in 758 ms

5.Flow的异常处理

当运算符中的发射器或代码抛出异常,可以有两种方式处理
1.try catch
2.catch函数

1.try catch适用于收集时发生的异常
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }
 
        try {
            flow.collect {
                println(it)
                throw RuntimeException()
            }
        } catch (e: Exception) {
            print("caught: $e")
        }
    }
}
2.虽然上流也可以使用try catch,但是更推荐catch函数
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                throw RuntimeException()
            }
        }.catch { e ->
            print("caught1: $e")
        }.collect {
            println(it)
        }
    }
}

6.Flow的完成

1.有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式

fun main() {
    runBlocking {
        try{
            val flow = flow {
                for (i in 1..3) {
                    emit(i)
                }
            }.collect {
                println(it)
            }
        }finally {
            println("done")            
        }
    }
}

2.onCompletion函数

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }.onCompletion {
            println("done")
        }.collect {
            println(it)
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读