干掉RxJava系列--2. 手写FlowBus替代RxBus/
2021-12-31 本文已影响0人
今阳说
LiveData的不足
- LiveData 是一个专用于 Android 的具备自主生命周期感知能力的可观察的数据存储器类,被有意简化设计,这使得开发者很容易上手,但其不足有如下两点:
- LiveData只能在主线程更新数据(postValue底层也是切换到主线程的,而且可能会有丢数据的问题);
- LiveData操作符不够强大, 对于较为复杂的交互数据流场景,建议使用 RxJava 或 Flow;
- LiveData与Android平台紧密相连,虽然LiveData在表现层中运行良好,但它并不适合领域层,因为领域层最好是独立于平台的;
- LiveData 对于 Java 开发者、初学者或是一些简单场景而言仍是可行的解决方案。对于MVVM架构而言,View和ViewModel之间可以通过LiveData交互(看了下面就知道其实也可以用StateFlow), ViewModel和Repository之间就可以通过Flow交互;
RxJava的不足
- RxJava还是相当强大的,基于事件流的链式调用,进行耗时任务,线程切换,是一个很好的异步操作库, 但是对于Android开发来说其也有一些不足之处
- 强大意味着复杂,其繁多的操作符简直是初学者的噩梦;
- 它是非官方的,google自然也就不会花大力气去推广和优化;
- 为项目的包体积带来了额外的增加;
Flow
- Flow 是一种 "冷流"(Cold Stream)。"冷流" 是一种数据源,该类数据源的生产者会在每个监听者开始消费事件的时候执行(即不消费则不生产数据,而LiveData的发送端并不依赖于接收端),从而在每个订阅上创建新的数据流(有多个订阅者的时候,他们各自的事件是独立的)。一旦消费者停止监听或者生产者的阻塞结束,数据流将会被自动关闭。
- Flow 是 Kotlin 协程与响应式编程模型结合的产物,支持线程切换、背压,通过协程取消功能提供自动清理功能,因此倾向于执行一些重型任务。
- 使用 take, first, toList 等操作符可以简化 Flow 的相关代码测试。
- Flow本身并不了解Android的生命周期,也不提供Android生命周期状态变化时收集器的自动暂停和恢复,可以使用LifecycleCoroutineScope的扩展,如 launchWhenStarted来启动coroutine来收集我们的Flow--这些收集器将自动暂停,并与组件的Lifecycle同步恢复。
- 相较于 Channel,Flow 末端操作符 会触发数据流的执行,同时会根据生产者一侧流操作来决定是成功完成操作还是抛出异常,因此 Flows 会自动地关闭数据流,不会在生产者一侧泄漏资源;而一旦 Channel 没有正确关闭,生产者可能不会清理大型资源,因此 Channels 更容易造成资源泄漏。
Flow的一些常用操作符
// val flow = flowOf(1,2,3,4,5)
// val flow: Flow<Int> = flow {
// List(20) {
// emit(it)//发送数据
// delay(300)
// }
// }
val flow = (1..10).asFlow()
lifecycleScope.launch {
flow.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器,设置的调度器只对它之前的操作有影响
.onStart { log("onStart") }
.flowOn(Dispatchers.Main)
.onEach {
log("onEach:$it")
delay(300)
}
.filter {//过滤
it % 2 == 0
}
.map {//变换
log("map:$it*$it")
it * it
}
.transform<Int,String> {
"num=$it"
// emit("num1=$it")
// emit("num2=$it")
}
.flowOn(Dispatchers.IO)
.onCompletion {//订阅流的完成,执行在流完成时的逻辑
log("onCompletion: $it")
}
.catch {//捕获 Flow 的异常,catch 函数只能捕获它的上游的异常
log("catch: $it")
}
.flowOn(Dispatchers.Main)
.collect {//消费Flow
log("collect1_1: $it")
}
//Flow 可以被重复消费
flow.collect { log("collect1_2: $it") }
//除了可以在 collect 处消费 Flow 的元素以外,还可以通过 onEach 来做到这一点。
// 这样消费的具体操作就不需要与末端操作符放到一起,collect 函数可以放到其他任意位置调用
flow.onEach {
log("onEach2:$it")
}
withContext(Dispatchers.IO) {
delay(1000)
flow.collect()
}
//除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
flow.onEach {
log("onEach2:$it")
}.launchIn(CoroutineScope(Dispatchers.IO))
.join()//主线程等待这个协程执行结束
Flow的取消
lifecycleScope.launch(Dispatchers.IO) {
val flow2 = (1..10).asFlow().onEach { delay(1000) }
val job: Job = lifecycleScope.launch {
log("lifecycleScope.launch")
flow2.flowOn(Dispatchers.IO)//设定它运行时所使用的调度器
.collect {//消费Flow
log("flow2:$it")
}
}
delay(2000)
job.cancelAndJoin()
}
Flow 的背压
- 只要是响应式编程,就一定会有背压问题,我们先来看看背压究竟是什么。
- 背压问题在生产者的生产速率高于消费者的处理速率的情况下出现。为了保证数据不丢失,我们也会考虑添加缓存来缓解问题:
//为 Flow 添加缓冲
flow {
List(5) {
emit(it)
}
}.buffer().collect {
log("flow buffer collect:$it")
}
- 也可以为 buffer 指定一个容量。不过,如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压。
- 问题产生的根本原因是生产和消费速率的不匹配,除直接优化消费者的性能以外,我们也可以采取一些取舍的手段。
- 第一种是 conflate。与 Channel 的 Conflate 模式一致,新数据会覆盖老数据,
flow {
List(10) {
emit(it)
}
}
.conflate()
.collect { value ->
log("flow conflate Collecting $value")
delay(100)
log("$value collected flow conflate ")
}
- 第二种是 collectLatest。顾名思义,只处理最新的数据,这看上去似乎与 conflate 没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消除 collectLatest 之外还有 mapLatest、flatMapLatest 等等,都是这个作用。
flow {
List(10) {
emit(it)
}
}.collectLatest { value ->
log("flow collectLatest Collecting $value")
delay(100)
log("$value collected flow collectLatest ")
}
使用更为安全的方式收集 Android UI 数据流
- 在 Android 开发中,请使用 LifecycleOwner.addRepeatingJob、suspend Lifecycle.repeatOnLifecycle 或 Flow.flowWithLifecycle 从 UI 层安全地收集数据流。(lifecycle-runtime-ktx:2.4.+ 库中所提供的)
lifecycleScope.launch {
delay(500)
repeatOnLifecycle(Lifecycle.State.STARTED) {
flow.collect { log("collect2: $it") }
}
}
lifecycleScope.launchWhenStarted {
delay(1000)
flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
delay(1500)
flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
.collect { log("collect4: $it") }
}
SharedFlow
- 冷流和订阅者只能是一对一的关系,当我们要实现一个流,多个订阅者的需求时,就需要热流了,SharedFlow就是一种热流
- 其构造函数如下
public fun <T> MutableSharedFlow(
replay: Int = 0,//当新的订阅者Collect时,发送几个已经发送过的数据给它,默认为0,即默认新订阅者不会获取以前的数据
extraBufferCapacity: Int = 0,//表示减去replay,MutableSharedFlow还缓存多少数据,默认为0
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示缓存策略,即缓冲区满了之后Flow如何处理
//BufferOverflow.SUSPEND 策略,也就是挂起策略, 默认为挂起
//BufferOverflow.DROP_OLDEST: 丢弃旧数据
//BufferOverflow.DROP_LATEST: 丢弃最新的数据
)
- 简单使用如下
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
delay(1000)
sharedFlow.emit("aaa")
delay(1000)
sharedFlow.emit("bbb")
delay(1000)
sharedFlow.emit("ccc")
}
lifecycleScope.launch {
delay(500)
sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
delay(1500)
sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
delay(2500)
sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
delay(3500)
sharedFlow.collect { log("collect4:$it") }
}
- 将冷流Flow转化为SharedFlow
lifecycleScope.launch {
(1..5).asFlow().shareIn(
//1. 共享开始时所在的协程作用域范围
scope = lifecycleScope,
//2. 控制共享的开始和结束的策略
// started = SharingStarted.Lazily,//当首个订阅者出现时开始,在scope指定的作用域被结束时终止
// started = SharingStarted.Eagerly,//立即开始,而在scope指定的作用域被结束时终止
//对于那些只执行一次的操作,您可以使用Lazily或者Eagerly。然而,如果您需要观察其他的流,就应该使用WhileSubscribed来实现细微但又重要的优化工作
//WhileSubscribed策略会在没有收集器的情况下取消上游数据流
started = SharingStarted.WhileSubscribed(
500,//stopTimeoutMillis 控制一个以毫秒为单位的延迟值,指的是最后一个订阅者结束订阅与停止上游流的时间差。默认值是 0(比如当用户旋转设备时,原来的视图会先被销毁,然后数秒钟内重建)
Long.MAX_VALUE//replayExpirationMillis表示数据重播的过时时间,如果用户离开应用太久,此时您不想让用户看到陈旧的数据,你可以用到这个参数
),
//3. 状态流的重播个数
replay = 0
).collect { log("shareIn.collect:$it") }
}
StateFlow
- StateFlow继承于SharedFlow,是SharedFlow的一个特殊变种
- 构造函数如下,只需要传入一个默认值
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
- StateFlow本质上是一个replay为1,并且没有缓冲区的SharedFlow,因此第一次订阅时会先获得默认值
- StateFlow仅在值已更新,并且值发生了变化时才会返回,即如果更新后的值没有变化,也不会回调Collect方法,这点与LiveData不同
- StateFlow 与 LiveData是最接近的,因为:
1. 它始终是有值的。
2. 它的值是唯一的。
3. 它允许被多个观察者共用 (因此是共享的数据流)。
4. 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。
- 简单使用
log("StateFlow 默认值:111")
val stateFlow = MutableStateFlow("111")
lifecycleScope.launch {
delay(500)
stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
delay(1500)
stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
delay(2500)
stateFlow.collect { log("StateFlow collect3:$it") }
}
lifecycleScope.launch(Dispatchers.IO) {
delay(5000)
log("StateFlow re emit:111")
stateFlow.emit("111")
delay(1000)
log("StateFlow emit:222")
stateFlow.emit("222")
}
- 普通流Flow转化成StateFlow
val stateFlow2: StateFlow<Int> = flow {
List(10) {
delay(300)
emit(it)
}
}.stateIn(
scope = lifecycleScope,
started = WhileSubscribed(5000),//等待5秒后仍然没有订阅者存在就终止协程
initialValue = 666//默认值
)
lifecycleScope.launchWhenStarted {//STARTED状态时会开始收集流,并且在RESUMED状态时保持收集,进入STOPPED状态时结束收集过程
stateFlow2.collect { log("StateFlow shareIn.collect:$it") }
}
StateFlow与SharedFlow 的区别
- SharedFlow配置更为灵活,支持配置replay,缓冲区大小等,StateFlow是SharedFlow的特化版本,replay固定为1,缓冲区大小默认为0;
- StateFlow与LiveData类似,支持通过myFlow.value获取当前状态,如果有这个需求,必须使用StateFlow;
- SharedFlow支持发出和收集重复值,而StateFlow当value重复时,不会回调collect;
- 对于新的订阅者,StateFlow只会重播当前最新值,SharedFlow可配置重播元素个数(默认为0,即不重播);
基于SharedFlow封装FlowBus
创建消息类EventMessage
class EventMessage {
/**
* 消息的key
*/
var key: Int
/**
* 消息的主体message
*/
var message: Any? = null
private var messageMap: HashMap<String, Any?>? = null
constructor(key: Int, message: Any?) {
this.key = key
this.message = message
}
constructor(key: Int) {
this.key = key
}
fun put(key: String, message: Any?) {
if (messageMap == null) {
messageMap = HashMap<String, Any?>()
}
messageMap?.set(key, message)
}
operator fun <T> get(key: String?): T? {
if (messageMap != null) {
try {
return messageMap!![key] as T?
} catch (e: ClassCastException) {
e.printStackTrace()
}
}
return null
}
}
创建FlowBus
class FlowBus : ViewModel() {
companion object {
val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
}
//正常事件
private val events = mutableMapOf<String, Event<*>>()
//粘性事件
private val stickyEvents = mutableMapOf<String, Event<*>>()
fun with(key: String, isSticky: Boolean = false): Event<Any> {
return with(key, Any::class.java, isSticky)
}
fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
return with(eventType.name, eventType, isSticky)
}
@Synchronized
fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
val flows = if (isSticky) stickyEvents else events
if (!flows.containsKey(key)) {
flows[key] = Event<T>(key, isSticky)
}
return flows[key] as Event<T>
}
class Event<T>(private val key: String, isSticky: Boolean) {
// private mutable shared flow
private val _events = MutableSharedFlow<T>(
replay = if (isSticky) 1 else 0,
extraBufferCapacity = Int.MAX_VALUE
)
// publicly exposed as read-only shared flow
val events = _events.asSharedFlow()
/**
* need main thread execute
*/
fun observeEvent(
lifecycleOwner: LifecycleOwner,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
action: (t: T) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
override fun onDestroy(owner: LifecycleOwner) {
super.onDestroy(owner)
LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
val subscriptCount = _events.subscriptionCount.value
if (subscriptCount <= 0)
instance.events.remove(key)
}
})
lifecycleOwner.lifecycleScope.launch(dispatcher) {
lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
events.collect {
try {
action(it)
} catch (e: Exception) {
LjyLogUtil.d("ker=$key , error=${e.message}")
}
}
}
}
}
/**
* send value
*/
suspend fun setValue(
event: T,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
) {
withContext(dispatcher) {
_events.emit(event)
}
}
}
}
使用FlowBus
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
withContext(Dispatchers.Main) {//不创建新的协程,指定协程上运行代码块,可以切换线程
FlowBus.instance.with(EventMessage::class.java)
.observeEvent(this@EventBusActivity) {
LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
}
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {
val event = EventMessage(111)
LjyLogUtil.d(
"FlowBus:send1_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
delay(2000)
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(101))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(102))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(103))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(104))
FlowBus.instance.with(EventMessage::class.java)
.setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
delay(4000)
val event = EventMessage(222, "bbb")
LjyLogUtil.d(
"FlowBus:send2_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
delay(6000)
withContext(Dispatchers.Main) {
val event = EventMessage(333, "ccc")
event.put("key1", 123)
event.put("key2", "abc")
LjyLogUtil.d(
"FlowBus:send3_${Thread.currentThread().name}_${
GsonUtils.toJson(
event
)
}"
)
FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
}
进一步优化
- 利用扩展函数,ViewModelStoreOwner,及预传EventMessage::class.javas是当前项目中的使用更加简单
//利用扩展函数
fun LifecycleOwner.observeEvent(
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
isSticky: Boolean = false,
action: (t: EventMessage) -> Unit
) {
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.with(EventMessage::class.java, isSticky = isSticky)
.observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}
fun postValue(
event: EventMessage,
delayTimeMillis: Long = 0,
isSticky: Boolean = false,
dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.viewModelScope
.launch(dispatcher) {
delay(delayTimeMillis)
ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(FlowBus::class.java)
.with(EventMessage::class.java, isSticky = isSticky)
.setValue(event)
}
}
private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {
private val eventViewModelStore: ViewModelStore = ViewModelStore()
override fun getViewModelStore(): ViewModelStore {
return eventViewModelStore
}
private val mApplicationProvider: ViewModelProvider by lazy {
ViewModelProvider(
ApplicationScopeViewModelProvider,
ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
)
}
fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
return mApplicationProvider[modelClass]
}
}
object FlowBusInitializer {
lateinit var application: Application
//在Application中初始化
fun init(application: Application) {
FlowBusInitializer.application = application
}
}
- 使用
lifecycleScope.launch(Dispatchers.IO) {
observeEvent {
LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
observeEvent(Dispatchers.IO) {
LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
observeEvent(Dispatchers.Main) {
LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
}
lifecycleScope.launch(Dispatchers.IO) {
delay(1000)
postValue(EventMessage(100))
postValue(EventMessage(101), 1000)
postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
val event3 = EventMessage(103, "ccc")
event3.put("key1", 123)
event3.put("key2", "abc")
postValue(event3, 2000, dispatcher = Dispatchers.Main)
}
参考
- StateFlow 和 SharedFlow
- 协程 Flow 最佳实践 | 基于 Android 开发者峰会应用
- 从 LiveData 迁移到 Kotlin 数据流
- 官方推荐Flow,LiveData:那我走?
- 谷歌推荐 Flow 取代 LiveData,真香?
- 谁能取代Android的LiveData- StateFlow or SharedFlow?
- 【Kotlin】就几行代码?! 用SharedFlow写个FlowEventBus