协程Flow

2022-05-13  本文已影响0人  旺仔_100
一、介绍

Flow,在Kotlin协程当中是自成体系的知识点。简单的异步场景我们可以直接使用挂起函数、launch、async;复制的异步场景,我们可以使用Flow。Flow已经开始占领RxJava的领地,Flow还要取代LiveData了。Flow是很香的呀!

Flow就是"数据流"

我们首先创建数据,然后对数据做各种处理,最后结束数据流,拿到想要的结果。它跟Channel的区别是,Channel只能是发送数据和接受数据。而Flow是可以做中间的数据处理的。

fun main() = runBlocking {
    flow {            //上游,发源地
        emit(1) //挂起函数 发送数据
        emit(3)
        emit(5)
        emit(7)
        emit(9)
    }.filter { it > 3 }    //中间对数据处理
        .take(2)     //中间对数据处理
        .collect{          //下游  结束数据处理,拿到结果
            println(it)
        }
}

5
7

简单分析一下:

采用flow和list的使用几乎一样
///使用其他的方式创建flow
fun main() = runBlocking {
    flowOf(2,4,6,8,10).filter { it > 4 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }


    listOf(2,4,6,8,10).filter {i -> i > 4  }
        .map { it * 2 }.take(2).forEach{
        println(it)
    }
}

第三种创建flow,可以通过list来转换成flow,当然也可以通过flow转换成list

//flow转list,list转flow
fun main() = runBlocking {
    //Flow 转list
    flowOf(1,2,3,4,5).toList()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach {
            println(it)
        }

    listOf(1,2,3,4,5).asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }
}
中间操作符

中间操作符(Intermedite Operators),除了之前提到的map、filter这种从集合那边抄过来,还有一些特殊的操作符,他们是专门为Flow设计的。

Flow生命周期

在flow的中间操作符当中,onStart、onCompletion这两个是比较特殊的。他们是以操作符的形式存在,但实际上的作用,是监听生命周期回调。

onStart ,它的作用是注册一个监听事件:当flow启动以后,它就被回调。具体我们可以看下面这个例子:

fun main()= runBlocking {
    flowOf(1,2,3,4,5)
        .filter {
            println("filter $it")
            it > 2
        }
        .map {
            println("map : $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .onCompletion { println("onCompletion") }
        .collect{
            println("collect:$it")
        }
}

onStart和onComplete跟位置没有关系,但是filter,map这些操作符是和位置有关系的。onComplete在下面三种情况都会进行回调:
1.Flow正常执行完毕
2.Flow当中出现异常
3.Flow被取消

//协程cancel 和协程异常
fun main() = runBlocking {
    launch {
        flow {
            emit(1)
            emit(2)
            emit(3)
        }.onCompletion { print("onComplete first : $it") }
            .collect{
                println("collect : $it")
                if (it == 2){
                    cancel()
                    println("cancel")
                }
            }
    }

    delay(100L)
    flowOf(4,5,6)
        .onCompletion { println("onCompletion second : $it") }
        .collect{
            println("collect:$it")
            throw IllegalStateException()
        }
}
catch异常的处理

我们可以使用两种方式来处理:

如果觉得还要分两次抓,其实可以直接使用try catch把整个协程包起来

//异常处理
fun  main() = runBlocking {
//    try {
        flowOf(1,2,3,4)
            .filter { it / 0  == 1 }
        .catch {
            println("我抓到你了,但是我不处理")
        }
            .collect{
//                try {
//                    throw IllegalStateException()
                    println("collect $it")
//                }catch ( e : Exception){
//                    println("我抓到异常了")
//                }
            }
//    }catch (e : Exception ){
//        println("我抓到异常了吧")
//    }

}
切换Context:flowOn、launchIn

对于异步任务,我们经常需要频繁的切换线程,我们可以通过FlowOn来灵活实现。

fun  main() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
    flow
        .flowOn(Dispatchers.IO)
        .filter {
        logX("Filter:$it")
        it > 2
    }
        .flowOn(Dispatchers.Default)

        .collect{
            logX("Collect $it")
        }

}

FlowOn只能制定他上游的代码执行的线程。RxJava你还要搞清楚,观察者与被观察者,采用不同的方法切换线程。这个就是FlowOn去制定,是不是很方便!

当然collect里面的代码切换线程需要使用withContext{}.


fun  main() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
    flow
        .flowOn(Dispatchers.IO)
        .filter {
        logX("Filter:$it")
        it > 2
    }
        .flowOn(Dispatchers.Default)

        .collect{
            withContext(Dispatchers.IO){
                logX("Collect $it")
            }
        }

}

也可以使用withContext,但是并不推荐,可能出现问题,还有就是写法也不太好。

fun main() = runBlocking {
        val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
   withContext(Dispatchers.IO){
        flow.flowOn(Dispatchers.IO)
            .filter {
                logX("filter $it")
                it > 2
            }
            .collect{
                logX("logX : $it")
            }
   }
}

可以使用launchIn,更加优雅,但是launchIn里面调用了collect,它会结束整个数据流。


fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.IO)
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }

    flow.flowOn(Dispatchers.Default)
        .filter {
            logX("filter $it")
            it > 2
        }

        .onEach {
            logX("onEach $it")
        }
//        .launchIn(scope)

       scope.launch {
           flow.collect{
               logX("collect $it")
           }
       }

    delay(1000)

}

总结一下:我们在Flow中线程切换使用flowOn、launchIn、withContext,实际中使用flowOn、launchIn就可以满足需求了。

下游:终止操作符

它代表着数据流动的终止,无法在进行数据的处理。我们前面用到的collect就是一个终止操作符。还有first()、single()、fold{}、reduce{}等。

为什么说Flow是"冷"的?

//看下冷流和热流的区别
fun main() = runBlocking {
    //冷流  下面的代码不会执行
    val  flow = flow {
        (1..3).forEach {
            println("before sent $it")
            emit(it)
            println("Send $it")
        }

    }

    //热流
    val channel = produce<Int>(capacity = 0) {
        (1..3).forEach {
            println("channel Before sent $it")
            send(it)
            println("channel sent $it")
        }
    }
    println("end")
}

end
channel Before sent 1

flow里面的代码并没有走,channel里面的代码是有走的。所有channel被称为热流,是因为不过有没有接收方,发送方都会工作。
flow被称为冷流,是因为只有调用终止操作符之后,Flow才会开始工作。

fun main()= runBlocking {
    flow {
        println("emit : 3")
        emit(3)
        println("emit : 4")
        emit(4)
        println("emit : 5")
        emit(5)
    }.filter {
        println("filter: $it")
        it > 2
    }.map {
        println("map:$it")
        it * 2
    }.collect{
        println("collect : $it")
    }
}

emit : 3
filter: 3
map:3
collect : 6
emit : 4
filter: 4
map:4
collect : 8
emit : 5
filter: 5
map:5
collect : 10

从上面的输出结果可以看出,flow是一条一条的处理数据的。而不是批量做同一个操作。所以这可以看出,flow “懒”的特性。

使用flow来做网络请求

///模拟下网络请求,使用flow来写  这里报错,Dispatchers.Main必须在android项目里面才有
//fun main() =  runBlocking{
//        fun  laodData() = flow {
//            repeat(3){
//                delay(100L)
//                emit(it)
//                logX("emit :$it")
//            }
//        }
//
//        fun updateUI(it: Int){  logX("updateUI $it")
//        }
//        fun showLoading(){
//            println("showLoading")}
//        fun hideLoading(){
//            println("hideLoading")
//        }
//        val uiScope = CoroutineScope(Dispatchers.Main)
//
//        laodData().onStart { showLoading() }
//            .map { it * 2 }
//            .flowOn(Dispatchers.IO)
//            .catch { cause: Throwable -> println(cause)
//                hideLoading()
//                emit(-1)}
//            .onEach { updateUI(it) }
//            .onCompletion { hideLoading() }
//            .launchIn(uiScope)
//
//        delay(10000L)
//
//    }


I/System.out: ============================================================
I/System.out: emit :0
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 0
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 2
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :1
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :2
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 4
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: hideLoading
总结
上一篇 下一篇

猜你喜欢

热点阅读