手写简单Rxjava理解其内部实现(一)

2021-09-27  本文已影响0人  进击de小黑
//被观察者
interface ObservableSource<T> {  
    //订阅 
   fun subscribe(observer: Observer<T>)
}
//观察者
interface Observer<T> {    
    //订阅
    fun onSubscribe()    
     //事件发送
    fun onNext(t: T)    
   //错误
    fun onError(t:Throwable)    
     //事件完成
    fun onComplete()
}
abstract class Observable<T> : ObservableSource<T> {
    override fun subscribe(observer: Observer<T>) {
        subscribeActual(observer);
    }
    //抽象订阅方法,这里会传入观察者的对象,交给数据发送者
    protected abstract fun subscribeActual(observer: Observer<T>)
}
interface Emitter<T> {
    fun onNext(t: T)
    fun onError(t:Throwable)
    fun onComplete()
}
interface ObservableOnSubscribe<T> {
    //被订阅者在此实现方法里发送数据
    fun subscribe(emitter: Emitter<T>)
}
//这里构造方法传入了数据发送者与被观察者建立联系具体实现
class ObservableCreate<T>(var observableOnSubscribe: ObservableOnSubscribe<T>) : Observable<T>() {
   //被观察者订阅观察者时,具体实现数据发送
    override fun subscribeActual(observer: Observer<T>) {
        val emitterCreate = EmitterCreate(observer)
        observableOnSubscribe.subscribe(emitterCreate)
        //触发了onSubscribe
        observer.onSubscribe()
    }
      //构造方法传入了观察者
    class EmitterCreate<T>(private val observer: Observer<T>) : Emitter<T> {

        override fun onNext(t: T) {
             //观察者接收到被观察者通过Emitter发送的数据
            observer.onNext(t)
        }

        override fun onError(t: Throwable) {
            observer.onError(t)
        }

        override fun onComplete() {
            observer.onComplete()
        }
    }
}

这样我们就完整创建出了:观察者、被观察者、数据发送者、数据发送者与被观察者联系接口。
我们在Observable里实现create(source: ObservableOnSubscribe<T>)创建一个ObservableCreate实例,并传入一个ObservableOnSubscribe用作数据发送。

    companion object {
        fun <T> create(source: ObservableOnSubscribe<T>): Observable<T> {
            return ObservableCreate(source)
        }
    }
  Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: Emitter<String>) {
                emitter.onNext("HELLO")
            }
        }).subscribe(object : Observer<String> {
                override fun onSubscribe() {
                    TODO("Not yet implemented")
                }

                override fun onNext(t: String) {
                    TODO("Not yet implemented")
                }

                override fun onError(t: Throwable) {
                    TODO("Not yet implemented")
                }

                override fun onComplete() {
                    TODO("Not yet implemented")
                }


            })

以上我们通过观察者模式就简易实现了,观察者及被观察者数据发送的建立联系。
后续将继续通过装饰者模式实现map操作符及线程切换等操作,进一步理解Rxjava。

上一篇 下一篇

猜你喜欢

热点阅读