Kotlin

Kotlin Flow 的基本用法

2021-11-17  本文已影响0人  Vic_wkx

Kotlin 的 Flow 用于流式编程。

Flow 基本使用

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }
}

输出:

1
2
3
4
5

Flow 生命周期

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.onStart {
            println("onStart")
        }.onCompletion {
            println("onCompletion")
        }.collect {
            println(it)
        }
    }
}

输出:

onStart
1
2
3
4
5
onCompletion

Flow 发生异常时的生命周期

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
                if (i == 3) throw Exception("my error")
            }
        }.onStart {
            println("onStart")
        }.onCompletion {
            println("onCompletion")
        }.collect {
            println(it)
        }
    }
}

输出:

onStart
1
2
3
onCompletion
Exception in thread "main" java.lang.Exception: my error

可以看出,Flow 发生异常时,也会先回调 onCompletion,再抛出异常。

Flow 的 onCompletion 可以携带异常信息

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
                if (i == 3) throw Exception("my error")
            }
        }.onStart {
            println("onStart")
        }.onCompletion { error ->
            println("onCompletion $error")
        }.collect {
            println(it)
        }
    }
}

输出如下:

onStart
1
2
3
onCompletion java.lang.Exception: my error
Exception in thread "main" java.lang.Exception: my error
    at ...

需要注意的是,catch 和 onCompletion 一起使用时,两者的顺序会影响运行结果:

Flow 异常处理

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
                if (i == 3) throw Exception("my error")
            }
        }.onStart {
            println("onStart")
        }.onCompletion {
            println("onCompletion")
        }.catch { exception ->
            println("catch:$exception")
        }.collect {
            println(it)
        }
    }
}

输出:

onStart
1
2
3
onCompletion
catch:java.lang.Exception: my error

Flow Retry 机制

fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
                if (i == 3) throw Exception("my error")
            }
        }.retry(3).onStart {
            println("onStart")
        }.onCompletion {
            println("onCompletion")
        }.catch { exception ->
            println("catch:$exception")
        }.collect {
            println(it)
        }
    }
}

输出:

onStart
1
2
3
1
2
3
1
2
3
1
2
3
onCompletion
catch:java.lang.Exception: my error

Flow 超时机制

fun main() {
    runBlocking {
        withTimeout(200) {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.onStart {
                println("onStart")
            }.onCompletion {
                println("onCompletion")
            }.collect {
                println(it)
            }
        }
    }
}

输出如下:

onStart
1
onCompletion
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms
    at ...

Flow 超时处理

fun main() {
    runBlocking {
        try {
            withTimeout(200) {
                flow {
                    for (i in 1..5) {
                        delay(100)
                        emit(i)
                    }
                }.onStart {
                    println("onStart")
                }.onCompletion {
                    println("onCompletion")
                }.collect {
                    println(it)
                }
            }
        } catch (e: TimeoutCancellationException) {
            println("Timeout")
        }
    }
}

输出如下:

onStart
1
onCompletion
Timeout

暂时只能用 try-catch 做超时处理。

Flow 执行时间

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..5) {
                delay(100)
                println("emit: $i")
                emit(i)
            }
        }.collect {
            delay(100)
            println("collect: $it")
        }
        println("Cost: ${System.currentTimeMillis() - startTime}ms")
    }
}

输出如下:

emit: 1
collect: 1
emit: 2
collect: 2
emit: 3
collect: 3
emit: 4
collect: 4
emit: 5
collect: 5
Cost: 1105ms

耗时约 1000ms,这是因为 Flow 的生产者和消费者是交替执行的。

Flow 生产者和消费者交替执行
上一篇下一篇

猜你喜欢

热点阅读