Android进阶笔记

干掉RxJava系列--2. 手写FlowBus替代RxBus/

2021-12-31  本文已影响0人  今阳说

LiveData的不足

  1. LiveData只能在主线程更新数据(postValue底层也是切换到主线程的,而且可能会有丢数据的问题);
  2. LiveData操作符不够强大, 对于较为复杂的交互数据流场景,建议使用 RxJava 或 Flow;
  3. LiveData与Android平台紧密相连,虽然LiveData在表现层中运行良好,但它并不适合领域层,因为领域层最好是独立于平台的;

RxJava的不足

  1. 强大意味着复杂,其繁多的操作符简直是初学者的噩梦;
  2. 它是非官方的,google自然也就不会花大力气去推广和优化;
  3. 为项目的包体积带来了额外的增加;

Flow

Flow的一些常用操作符

//        val flow = flowOf(1,2,3,4,5)
//        val flow: Flow<Int> = flow {
//            List(20) {
//                emit(it)//发送数据
//                delay(300)
//            }
//        }
val flow = (1..10).asFlow()
lifecycleScope.launch {
    flow.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器,设置的调度器只对它之前的操作有影响
        .onStart { log("onStart") }
        .flowOn(Dispatchers.Main)
        .onEach {
            log("onEach:$it")
            delay(300)
        }
        .filter {//过滤
            it % 2 == 0
        }
        .map {//变换
            log("map:$it*$it")
            it * it
        }
        .transform<Int,String> {
            "num=$it"
//                    emit("num1=$it")
//                    emit("num2=$it")
        }
        .flowOn(Dispatchers.IO)
        .onCompletion {//订阅流的完成,执行在流完成时的逻辑
            log("onCompletion: $it")
        }
        .catch {//捕获 Flow 的异常,catch 函数只能捕获它的上游的异常
            log("catch: $it")
        }
        .flowOn(Dispatchers.Main)
        .collect {//消费Flow
            log("collect1_1: $it")
        }
    //Flow 可以被重复消费
    flow.collect { log("collect1_2: $it") }
    //除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。
    // 这样消费的具体操作就不需要与末端操作符放到一起,collect 函数可以放到其他任意位置调用
    flow.onEach {
        log("onEach2:$it")
    }
    withContext(Dispatchers.IO) {
        delay(1000)
        flow.collect()
    }
    //除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
    flow.onEach {
        log("onEach2:$it")
    }.launchIn(CoroutineScope(Dispatchers.IO))
        .join()//主线程等待这个协程执行结束

Flow的取消

lifecycleScope.launch(Dispatchers.IO) {
    val flow2 = (1..10).asFlow().onEach { delay(1000) }
    val job: Job = lifecycleScope.launch {
        log("lifecycleScope.launch")
        flow2.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器
            .collect {//消费Flow
                log("flow2:$it")
            }
    }
    delay(2000)
    job.cancelAndJoin()
}

Flow 的背压

//为 Flow 添加缓冲
flow {
    List(5) {
        emit(it)
    }
}.buffer().collect {
    log("flow buffer collect:$it")
}
flow {
    List(10) {
        emit(it)
    }
}
.conflate()
.collect { value ->
    log("flow conflate Collecting $value")
    delay(100)
    log("$value collected flow conflate ")
}
flow {
    List(10) {
        emit(it)
    }
}.collectLatest { value ->
    log("flow collectLatest Collecting $value")
    delay(100)
    log("$value collected flow collectLatest ")
}

使用更为安全的方式收集 Android UI 数据流

lifecycleScope.launch {
    delay(500)
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        flow.collect { log("collect2: $it") }
    }
}
lifecycleScope.launchWhenStarted {
    delay(1000)
    flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
    delay(1500)
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
        .collect { log("collect4: $it") }
}

SharedFlow

public fun <T> MutableSharedFlow(
    replay: Int = 0,//当新的订阅者Collect时,发送几个已经发送过的数据给它,默认为0,即默认新订阅者不会获取以前的数据
    extraBufferCapacity: Int = 0,//表示减去replay,MutableSharedFlow还缓存多少数据,默认为0
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示缓存策略,即缓冲区满了之后Flow如何处理
    //BufferOverflow.SUSPEND 策略,也就是挂起策略, 默认为挂起
    //BufferOverflow.DROP_OLDEST: 丢弃旧数据
    //BufferOverflow.DROP_LATEST: 丢弃最新的数据
)
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    sharedFlow.emit("aaa")
    delay(1000)
    sharedFlow.emit("bbb")
    delay(1000)
    sharedFlow.emit("ccc")
}
lifecycleScope.launch {
    delay(500)
    sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
    delay(3500)
    sharedFlow.collect { log("collect4:$it") }
}
lifecycleScope.launch {
    (1..5).asFlow().shareIn(
        //1. 共享开始时所在的协程作用域范围
        scope = lifecycleScope,
        //2. 控制共享的开始和结束的策略
        // started = SharingStarted.Lazily,//当首个订阅者出现时开始,在scope指定的作用域被结束时终止
        // started = SharingStarted.Eagerly,//立即开始,而在scope指定的作用域被结束时终止
        //对于那些只执行一次的操作,您可以使用Lazily或者Eagerly。然而,如果您需要观察其他的流,就应该使用WhileSubscribed来实现细微但又重要的优化工作
        //WhileSubscribed策略会在没有收集器的情况下取消上游数据流
        started = SharingStarted.WhileSubscribed(
            500,//stopTimeoutMillis 控制一个以毫秒为单位的延迟值,指的是最后一个订阅者结束订阅与停止上游流的时间差。默认值是 0(比如当用户旋转设备时,原来的视图会先被销毁,然后数秒钟内重建)
            Long.MAX_VALUE//replayExpirationMillis表示数据重播的过时时间,如果用户离开应用太久,此时您不想让用户看到陈旧的数据,你可以用到这个参数
        ),
        //3. 状态流的重播个数
        replay = 0
    ).collect { log("shareIn.collect:$it") }
}

StateFlow

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
1. 它始终是有值的。
2. 它的值是唯一的。
3. 它允许被多个观察者共用 (因此是共享的数据流)。
4. 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。
log("StateFlow 默认值:111")
val stateFlow = MutableStateFlow("111")

lifecycleScope.launch {
    delay(500)
    stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    stateFlow.collect { log("StateFlow collect3:$it") }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(5000)
    log("StateFlow re emit:111")
    stateFlow.emit("111")
    delay(1000)
    log("StateFlow emit:222")
    stateFlow.emit("222")
}
val stateFlow2: StateFlow<Int> = flow {
    List(10) {
        delay(300)
        emit(it)
    }
}.stateIn(
    scope = lifecycleScope,
    started = WhileSubscribed(5000),//等待5秒后仍然没有订阅者存在就终止协程
    initialValue = 666//默认值
)
lifecycleScope.launchWhenStarted {//STARTED状态时会开始收集流,并且在RESUMED状态时保持收集,进入STOPPED状态时结束收集过程
    stateFlow2.collect { log("StateFlow shareIn.collect:$it") }

}

StateFlow与SharedFlow 的区别

  1. SharedFlow配置更为灵活,支持配置replay,缓冲区大小等,StateFlow是SharedFlow的特化版本,replay固定为1,缓冲区大小默认为0;
  2. StateFlow与LiveData类似,支持通过myFlow.value获取当前状态,如果有这个需求,必须使用StateFlow;
  3. SharedFlow支持发出和收集重复值,而StateFlow当value重复时,不会回调collect;
  4. 对于新的订阅者,StateFlow只会重播当前最新值,SharedFlow可配置重播元素个数(默认为0,即不重播);

基于SharedFlow封装FlowBus

创建消息类EventMessage

class EventMessage {
    /**
     * 消息的key
     */
    var key: Int

    /**
     * 消息的主体message
     */
    var message: Any? = null
    private var messageMap: HashMap<String, Any?>? = null

    constructor(key: Int, message: Any?) {
        this.key = key
        this.message = message
    }

    constructor(key: Int) {
        this.key = key
    }

    fun put(key: String, message: Any?) {
        if (messageMap == null) {
            messageMap = HashMap<String, Any?>()
        }
        messageMap?.set(key, message)
    }

    operator fun <T> get(key: String?): T? {
        if (messageMap != null) {
            try {
                return messageMap!![key] as T?
            } catch (e: ClassCastException) {
                e.printStackTrace()
            }
        }
        return null
    }
}

创建FlowBus

class FlowBus : ViewModel() {
    companion object {
        val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
    }

    //正常事件
    private val events = mutableMapOf<String, Event<*>>()

    //粘性事件
    private val stickyEvents = mutableMapOf<String, Event<*>>()

    fun with(key: String, isSticky: Boolean = false): Event<Any> {
        return with(key, Any::class.java, isSticky)
    }

    fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
        return with(eventType.name, eventType, isSticky)
    }

    @Synchronized
    fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
        val flows = if (isSticky) stickyEvents else events
        if (!flows.containsKey(key)) {
            flows[key] = Event<T>(key, isSticky)
        }
        return flows[key] as Event<T>
    }


    class Event<T>(private val key: String, isSticky: Boolean) {

        // private mutable shared flow
        private val _events = MutableSharedFlow<T>(
            replay = if (isSticky) 1 else 0,
            extraBufferCapacity = Int.MAX_VALUE
        )

        // publicly exposed as read-only shared flow
        val events = _events.asSharedFlow()

        /**
         * need main thread execute
         */
        fun observeEvent(
            lifecycleOwner: LifecycleOwner,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
            minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
            action: (t: T) -> Unit
        ) {
            lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
                override fun onDestroy(owner: LifecycleOwner) {
                    super.onDestroy(owner)
                    LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
                    val subscriptCount = _events.subscriptionCount.value
                    if (subscriptCount <= 0)
                        instance.events.remove(key)
                }
            })
            lifecycleOwner.lifecycleScope.launch(dispatcher) {
                lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
                    events.collect {
                        try {
                            action(it)
                        } catch (e: Exception) {
                            LjyLogUtil.d("ker=$key , error=${e.message}")
                        }
                    }
                }
            }
        }

        /**
         * send value
         */
        suspend fun setValue(
            event: T,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
        ) {
            withContext(dispatcher) {
                _events.emit(event)
            }

        }
    }
}

使用FlowBus

FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
    withContext(Dispatchers.Main) {//不创建新的协程,指定协程上运行代码块,可以切换线程
        FlowBus.instance.with(EventMessage::class.java)
            .observeEvent(this@EventBusActivity) {
                LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
            }
    }
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {

    val event = EventMessage(111)
    LjyLogUtil.d(
        "FlowBus:send1_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
    delay(2000)
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(101))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(102))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(103))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(104))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
    delay(4000)
    val event = EventMessage(222, "bbb")
    LjyLogUtil.d(
        "FlowBus:send2_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
    delay(6000)
    withContext(Dispatchers.Main) {
        val event = EventMessage(333, "ccc")
        event.put("key1", 123)
        event.put("key2", "abc")
        LjyLogUtil.d(
            "FlowBus:send3_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
    }
}

进一步优化

//利用扩展函数
fun LifecycleOwner.observeEvent(
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    isSticky: Boolean = false,
    action: (t: EventMessage) -> Unit
) {
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .with(EventMessage::class.java, isSticky = isSticky)
        .observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}

fun postValue(
    event: EventMessage,
    delayTimeMillis: Long = 0,
    isSticky: Boolean = false,
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
    LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .viewModelScope
        .launch(dispatcher) {
            delay(delayTimeMillis)
            ApplicationScopeViewModelProvider
                .getApplicationScopeViewModel(FlowBus::class.java)
                .with(EventMessage::class.java, isSticky = isSticky)
                .setValue(event)
        }
}

private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {

    private val eventViewModelStore: ViewModelStore = ViewModelStore()

    override fun getViewModelStore(): ViewModelStore {
        return eventViewModelStore
    }

    private val mApplicationProvider: ViewModelProvider by lazy {
        ViewModelProvider(
            ApplicationScopeViewModelProvider,
            ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
        )
    }

    fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
        return mApplicationProvider[modelClass]
    }

}

object FlowBusInitializer {
    lateinit var application: Application
    //在Application中初始化
    fun init(application: Application) {
        FlowBusInitializer.application = application
    }
}
lifecycleScope.launch(Dispatchers.IO) {
    observeEvent {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    observeEvent(Dispatchers.IO) {
        LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }

    observeEvent(Dispatchers.Main) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    postValue(EventMessage(100))
    postValue(EventMessage(101), 1000)
    postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
    val event3 = EventMessage(103, "ccc")
    event3.put("key1", 123)
    event3.put("key2", "abc")
    postValue(event3, 2000, dispatcher = Dispatchers.Main)
}

参考

我是今阳,如果想要进阶和了解更多的干货,欢迎关注微信公众号 “今阳说” 接收我的最新文章

上一篇下一篇

猜你喜欢

热点阅读