【Koltin Flow(三)】Flow操作符之中间操作符(三)

2022-08-03  本文已影响0人  MakerGaoGao

目录

【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow

前言

  1. 本篇主要介绍中间操作符的功能性操作符以及一些其他操作符。
  2. 如果对其他的操作符或者flow基本知识不太了解,可参考目录的其他篇内容作为参考。

功能操作符

1. retry、retryWhen retry为retryWhen的简化版本,可设置重试次数,以及在闭包内重试开关。

retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。

代码如下:
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry(2).catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry(2)  $it")
            }
            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry {
                it is RuntimeException
            }.catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry{}  $it")
            }


            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retryWhen { cause, attempt ->
                Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
                cause is RuntimeException
            } .catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retryWhen  $it")
            }

日志如下:
2022-08-02 10:26:55.301 4775-4801/edu.test.demo D/Test-TAG: retry(2)  100
2022-08-02 10:26:55.302 4775-4801/edu.test.demo D/Test-TAG: retry{}  100
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: retryWhen  100
分析:
2. cancellable 设置了之后则cancel()取消了flow值得发送,但是有个特殊情况,asFlow的时候需要这个cancellable设置,但是flow{}直接创建出来的flow则不需要设置,原因在下面的分析中看源码说明。
代码如下:

代码1

            (1..10).asFlow().cancellable().catch {
                Log.e(TAG.TAG,"ex is $it")
            }.collect {
                if (it == 5){
                    cancel()
                }
                Log.d(TAG.TAG, " (1..10).asFlow() cancellable $it")
            }

代码2

         flow {
                repeat(10){
                    emit(it)
                }
            }.collect {
                if (it == 5){
                    cancel()
                }
                Log.d(TAG.TAG, "flow{} cancellable $it")
            }
日志如下:

日志1

2022-08-02 11:03:27.029 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 1
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 2
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 3
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 4
2022-08-02 11:03:27.035 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 5

日志2

2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 0
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 1
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 3
2022-08-02 11:10:51.502 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 4
2022-08-02 11:10:51.505 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 5
分析:
/**
 * Creates a flow that produces values from the range.
 */
public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

不是unsafeFlow呀,直接也是flow呀,这里注意导入:

import kotlinx.coroutines.flow.internal.unsafeFlow as flow

看出来了吧,是unsafeFlow...,到这里asFlow就解释的差不多了。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow,是不是从名字也可以看出一些端倪,如果不清楚在继续往下看:

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>{
    ......
}

看到了吧,CancellableFlow,可取消的flow,再看

/**
 * Internal marker for flows that are [cancellable].
 */
internal interface CancellableFlow<out T> : Flow<T>

是不是更清楚了,flow{},生成的本身就是CancellableFlow,到此处flow{},也解释的差不多了。

public fun <T> Flow<T>.cancellable(): Flow<T> =
    when (this) {
        is CancellableFlow<*> -> this // Fast-path, already cancellable
        else -> CancellableFlowImpl(this)
    }

两种情况,一种本身就是CancellableFlow,直接返回自己,另外一种,则创建CancellableFlowImpl,再看看CancellableFlowImpl:

private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        flow.collect {
            currentCoroutineContext().ensureActive()
            collector.emit(it)
        }
    }
}

可以看到最后得出的也是CancellableFlow。

3. flowOn flowOn用于切换flow上游的执行线程
flowOn的取值共有四种:Dispatchers.IO、Dispatchers.Unconfined、Dispatchers.Main、Dispatchers.Default
代码如下:
            flow {
                Log.d(TAG.TAG,"Default-emit current is ${Thread.currentThread().name}")
                emit(10)
            }.flowOn(Dispatchers.Default).collect {
                Log.d(TAG.TAG,"Default-collect current is ${Thread.currentThread().name}")
            }

            flow {
                Log.d(TAG.TAG,"Main-emit current is ${Thread.currentThread().name}")
                emit(10)
            }.flowOn(Dispatchers.Main).collect {
                Log.d(TAG.TAG,"Main-collect current is ${Thread.currentThread().name}")
            }
日志如下:
2022-08-02 11:44:26.978 7102-7128/edu.test.demo D/Test-TAG: Default-emit current is DefaultDispatcher-worker-1
2022-08-02 11:44:26.985 7102-7128/edu.test.demo D/Test-TAG: Default-collect current is DefaultDispatcher-worker-1
2022-08-02 11:44:27.115 7102-7102/edu.test.demo D/Test-TAG: Main-emit current is main
2022-08-02 11:44:27.118 7102-7128/edu.test.demo D/Test-TAG: Main-collect current is DefaultDispatcher-worker-1
分析:
4. buffer、conflate 这两个操作符主要处理背压的问题,本篇这里不做展开,在背压的部分会展开说明。

其他操作符

1. produceIn、receiveAsFlow、consumeAsFlow flow和channel之间转换相关,后续进行说明。
2. asSharedFlow、asStateFlow、shareIn、stateIn SharedFlow和StateFlow相关,后续进行说明。

总结

上一篇下一篇

猜你喜欢

热点阅读