Kotlin协程——Flow(一)

2023-07-16  本文已影响0人  Deck方

Flow是一种类似于序列的冷流(冷启动l,即lazy的),flow构建器中的代码直接到流被收集的时候才运行。

一、流的特性

fun testFlow() = runBlocking<Unit> {
        (1..5).asFlow().filter {
            it%2==0
        }.map {
            "string $it"
        }.collect {
            println("Collect $it")
        }
    }

二、流的上下文

fun testFlowOn() = runBlocking<Unit> {
        flow<Int> {
            println("start ${Thread.currentThread().name}")
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
            .flowOn(Dispatchers.IO+CoroutineName("IOWorker"))
            .collect {
                println("Collect $it ${Thread.currentThread().name}")
            }
    }
//print result
//start DefaultDispatcher-worker-1 @IOWorker#2
//Collect 1 main @coroutine#1
//Collect 2 main @coroutine#1
//Collect 3 main @coroutine#1

    fun testLaunchIn() = runBlocking<Unit> {
        flow<Int> {
            println("start ${Thread.currentThread().name}")
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
            .flowOn(Dispatchers.IO + CoroutineName("IOWorker"))
            .onEach {
                println("onEach $it ${Thread.currentThread().name}")
            }
            .launchIn(CoroutineScope(Dispatchers.Default + CoroutineName("LaunchWorker")))
            .join()
    }
//print result
//start DefaultDispatcher-worker-2 @IOWorker#3
//onEach 1 DefaultDispatcher-worker-1 @LaunchWorker#2
//onEach 2 DefaultDispatcher-worker-2 @LaunchWorker#2
//onEach 3 DefaultDispatcher-worker-1 @LaunchWorker#2

三、流的取消、流的取消检测

//如果不使用cancellable操作符 取消会失败,因为协程比较繁忙。
fun testCancel() = runBlocking<Unit> {

        flow<Int> {
            for (i in 1..5) {
                println("emit $i")
                emit(i)
            }
        }.cancellable()
            .collect {
                println(it)
                if (it == 3) {
                    cancel()
                }
            }

    }
//print result 
//emit 1
//1
//emit 2
//2
//emit 3

四、背压

水流受到与流动方向一致的压力称为背压。—>(背压) =>(水流)
在生产者与消费者模型(Flow)中,生产者的生产速度大于消费者的的消费速度就会产生背压。

fun testBuffer() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }
                .buffer(10)
                .collect {
                    delay(300)
                    println("Collected ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//print result 
//...
//total duration 1811
//总时间少于2000ms

//conflate()处理最新值
fun testConflate() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..5) {
                    delay(100)
                    println("emit  $i ")
                    emit(i)
                }
            }
                .conflate()
                .collect {
                    delay(300)
                    println("Collected  $it in ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//emit  1 
//emit  2 
//emit  3 
//Collected  1 in main @coroutine#1
//emit  4 
//emit  5 
//Collected  3 in main @coroutine#1
//Collected  5 in main @coroutine#1
//total duration 1249
//collectLatest 收集最后一个值
fun testCollectLatest() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..4) {
                    delay(100)
                    println("emit  $i ")
                    emit(i)
                }
            }
                .collectLatest {
                    delay(300)
                    println("Collected  $it in ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//print result 
//emit  1 
//emit  2 
//emit  3 
//emit  4 
//Collected  4 in main @coroutine#6
//total duration 940
上一篇下一篇

猜你喜欢

热点阅读