KotlinRxJavaKotlin

基于Kotlin实现的三种 RxBus

2017-06-20  本文已影响338人  兰兰笑笑生

RxBus 是 EventBus 在RxJava 的替代品 ,而在RxJava 中只需要短短几行代码就能实现。RxJava1 与 RxJava2 有些微不同,具体可以参考 What’s different in 2.0 。下面总结用Kotlin 实现的不同场景的 RxBus :

没有背压处理(Backpressure)的 Rxbus

在RxJava2里,引入了Flowable这个类来处理Backpressure,而Observable不包含Backpressure 处理。

object RxBus {
    private val mBus: Subject<Any> = PublishSubject.create()

    fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)

    fun toObservable(): Observable<Any> = mBus

    fun post(obj: Any) {
        mBus.onNext(obj)
    }

    fun hasObservers(): Boolean {
        return mBus.hasObservers()
    }
}

但是在调用toObservable(clzz: Class<T>)的时候,不要简单使用 XXX.class ,因为这样不符合Kotlin的语法,正确的的调用方式是 :

RxBus.toObservable(String::class.java).subscribe( ... )

有背压处理的 RxBus

object RxBus {
    private val mBus: FlowableProcessor<Any> = PublishProcessor.create()

    fun <T> toFlowable(tClass: Class<T>): Flowable<T> {
        return mBus.ofType(tClass)
    }

    fun toFlowable(): Flowable<Any> {
        return mBus
    }

    fun post(obj: Any) {
        mBus.onNext(obj)
    }

    fun hasSubscribers(): Boolean {
        return mBus.hasSubscribers()
    }
}

有异常处理的 Rxbus -基于 RxRelay

RxRelay 是既是Observable也是Consumer的RxJava 类型。

object RxBus3 {
    private val mBus: PublishRelay<Any> = PublishRelay.create()

    fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)

    fun toObservable(): Observable<Any> = mBus

    fun post(obj: Any) {
        mBus.accept(obj)
    }

    fun hasObservers(): Boolean {
        return mBus.hasObservers()
    }
}

关于RxRelay与Subject 的区别,网上一般说"RxRelay 即使出现异常也不会终止订阅关系" , 一开始看到这句话,有点蒙,因为根据我的实际验证,订阅者有异常时,这个订阅会自动取消,也不会影响其它订阅者,这个效果 RxRelay 与 Subject 是一样的。而它们真正的区别是

参考

上一篇 下一篇

猜你喜欢

热点阅读