Kotlinkotlin收藏kotlin

kotlin--Flow的运用

2021-09-03  本文已影响0人  aruba

Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。

一、Flow的使用

1.Flow的创建

1.可以使用flow构建函数构建一个Flow类型返回值的函数
2.flow{}构建体中可以调用挂起函数,即上流
3.上流使用emit函数发射值
4.下流使用collect函数收集值

//上流函数
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        //下流接收数据
        simpleFlow().collect { value ->
            println(value)
        }

        println("finished")
    }
}

结果:
1
2
3
finished

2.Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow().collect { value ->
            println(value)
        }

        println("collect1 finished")

        simpleFlow().collect { value ->
            println(value)
        }

        println("collect2 finished")
    }
}

结果:
1
2
3
collect1 finished
1
2
3
collect2 finished

3.Flow的连续性

Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.filter {
            it % 2 == 0 //只取偶数
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}

结果:
String 2
String 4

4.Flow构建器

除了使用flow函数外,还有两种方式
1.flowOf函数
2.使用.asFlow()扩展函数,可以将各种集合与序列转为流

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()

        flowOf(3, 5, 7)
            .onEach { delay(100) }
            .collect {
                println("${System.currentTimeMillis() - startTime}ms $it")
            }

        (3..6).asFlow().collect { println(it) }
    }
}

结果:
131ms 3
239ms 5
350ms 7
3
4
5
6

5.collect为挂起函数,但是Flow也提供了flowOn函数方便我们指定上流是否使用子协程执行
fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .collect {
                println("collect:${Thread.currentThread().name} $it")
            }
    }
}

结果:
flow :DefaultDispatcher-worker-1
collect:main 1
collect:main 2
collect:main 3
collect:main 4
collect:main 5

下流还是会使用主协程的上下文

6.除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主线程等待这个协程执行结束
    }
}

结果:
flow :DefaultDispatcher-worker-1
collect:DefaultDispatcher-worker-1 1
collect:DefaultDispatcher-worker-1 2
collect:DefaultDispatcher-worker-1 3
collect:DefaultDispatcher-worker-1 4
collect:DefaultDispatcher-worker-1 5

7.Flow的取消

Flow的取消和协程的取消相同,流的收集是CPU密集型的,但是如果收集时有挂起函数,那么挂起函数可以抛出取消异常来中断执行
使用了新协程的情况,可以使用cancel:

fun main() {
    runBlocking {
        val flow = flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))

        delay(200)
        flow.cancel()
        flow.join()
    }
}

使用timeout:

fun main() {
    runBlocking {
        withTimeoutOrNull(300){
            flow {
                println("flow :${Thread.currentThread().name}")
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.flowOn(Dispatchers.Default)
                .collect { println("collect:${Thread.currentThread().name} $it") }
        }
        
        println("finished")
    }
}
8.Flow的取消检测

之前我们调用子协程的取消时,CPU密集型代码并不能结束运行,在不使用挂起函数的情况下,我们在子协程体中通过ensureActive函数来检测该协程是否被取消了
1.而Flow为了方便,Flow构建器会对每个发射值(emit函数)执行ensureActive函数来进行取消

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                emit(i)
            }
        }
            .collect {
                println("$it")
                if (it > 2)
                    cancel()
            }

        println("finished")
    }
}

结果:
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@2833cc44

2.出于性能考虑,大多数其他流操作不会执行检测,此时我们可以使用cancellable函数来指定该Flow是可以取消的

fun main() {
    runBlocking {
        val flow = flowOf(1, 2, 3, 5)
            .cancellable()//不指定,那么将不执行取消检测
            .collect {
                println("$it")
                if (it > 2)
                    cancel()
            }

        println("finished")
    }
}

结果:
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7c29daf3

9.背压

上流每次发射耗时1s,下流接收耗时3s,那么它们总共会耗时多久

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow
                .collect {
                delay(3000)
                println("$it")
            }
        }

        println("time : $time ms")
    }
}

结果:
1
2
3
time : 12073 ms
可以看出,一般情况下,上下流执行是同步的

1.使用buff,来让上流不等待下流接收,而是发射到缓存区

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.buffer(50)//指定缓存区大小为50个
                .collect {
                delay(3000)
                println("$it")
            }
        }

        println("time : $time ms")
    }
}

结果:
1
2
3
time : 10158 ms
时间是1s + 3s * 3

2.指定上流协程

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.flowOn(Dispatchers.IO)
                .collect {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

结果和1.是一样的

3.有时我们不需要一个不漏的接收上流的元素时,可以使用conflate,下流来不及处理的会被丢弃掉

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.conflate()
                .collect {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

结果:
1
3
time : 7124 ms

4.collectLast可以只接收上流发射的最后一个元素

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow
                .collectLatest {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

3
time : 6144 ms

二、操作符

上面我们也提到了Flow支持函数式编程,用法和之前学习的差不多

1.转换操作符

1.map函数

fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}

结果:
String 1
String 2
String 3

2.transform函数,还可以将上流的一个变为多个发射出去

fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.transform {
            emit("String1 $it")
            emit("String2 $it")
        }.collect {
            println(it)
        }
    }
}

结果:
String1 1
String2 1
String1 2
String2 2
String1 3
String2 3

2.限长操作符

take函数

fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.take(2).collect {
            println(it)
        }
    }
}

结果:
1
2

3.末端操作符

末端操作符是用于启动流的挂起函数,collect是最基础的末端操作符,除此以外还有其他的
1.转化为各种集合,如:toList或toSet
2.获取第一个元素(first)与确保流只发射一个元素(single)
3.flod与reduce将流整合到一个值

flod函数

fun main() {
    runBlocking {
       val value =  flow {
            for (i in 1..3) {
                emit(i)
            }
        }.map {
            it * it
        }.fold(0) { acc, value ->
            acc + value
        }
        
        print(value)
    }
}

结果:
14

4.组合操作符

zip函数

fun main() {
    runBlocking {
        val flow1 = flow {
            for (i in 1..3) {
                emit(i)
            }
        }

        val flow2 = flowOf("one", "two", "three")

        flow1.zip(flow2) { a, b ->
            "$a -> $b"
        }.collect { value ->
            println(value)
        }
    }
}

结果:
1 -> one
2 -> two
3 -> three

5.展平操作符

类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了
1.flatMapConcat

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapConcat {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果:
52ms first 1
570ms second 1
571ms first 2
1074ms second 2
1074ms first 3
1577ms second 3

2.flatMapMerge
和flatMapConcat不同,flatMapConcat是按流函数体中顺序执行,而flatMapMerge中遇到发射函数时,会一次性执行上流的所有发射

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapMerge {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果:
130ms first 1
130ms first 2
131ms first 3
632ms second 1
632ms second 2
632ms second 3

3.flatMapLatest
flatMapLatest中遇到第二个发射函数时,只会发射上流最后一次的元素

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapLatest {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果:
298ms first 1
300ms first 2
301ms first 3
806ms second 3

三、Flow的异常处理

当运算符中的发射器或代码抛出异常,可以有两种方式处理
1.try catch
2.catch函数

1.try catch适用于收集时发生的异常
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }

        try {
            flow.collect {
                println(it)
                throw RuntimeException()
            }
        } catch (e: Exception) {
            print("caught: $e")
        }
    }
}
2.虽然上流也可以使用try catch,但是更推荐catch函数
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                throw RuntimeException()
            }
        }.catch { e ->
            print("caught1: $e")
        }.collect {
            println(it)
        }
    }
}

四、Flow的完成

有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式

1.finally块

fun main() {
    runBlocking {
        try{
            val flow = flow {
                for (i in 1..3) {
                    emit(i)
                }
            }.collect {
                println(it)
            }
        }finally {
            println("done")            
        }
    }
}

2.onCompletion函数

fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }.onCompletion {
            println("done")
        }.collect {
            println(it)
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读