RxJava手写rxjava

RxJava 源码学习手写框架 一

2020-09-29  本文已影响0人  tingtingtina

能看到这篇文章,真的不会亏的,尤其对于不了解其原理的朋友们。


Part 1:使用观察者模式

RxJava 主要运用了观察者模式,只不过在观察者模式中,一个被观察者可以有多个观察者。在 RxJava 中是使用的是变种观察者模式,一个被观察者只有一个观察者。
既然是观察者模式,首先需要有个观察者和被观察者。

有个抽象的观察者接口 Observer,接口很简单,我们常见的四个方法。

public interface Observer<T> {
    //接收消息 update
    void onNext(T t);

    // 建立关联时
    void onSubscribe();

    // 接收异常消息
    void onError(Throwable e);

    // 接收消息完成
    void onComplete();
}

还有个抽象的被观察者类 ObservableSource,里面只有一个方法,就是用来订阅被观察者 Observer。

public interface ObservableSource<T> {
    // 绑定 Observable 与 Observer, 订阅
    void subscribeObserver(Observer<T> observer);
}

创建具体的被观察者 Observable,实现了 ObservableSource。到此,一个观察者模式的框架就完成了。

public abstract class Observable<T> implements ObservableSource<T> {
    @Override
    public void subscribeObserver(Observer<T> observer) {
        observer.onNext(msg);
    }
}

到这里,我们看到源码中并没有直接使用 onNext,而是使用了模板方法,把具体的订阅逻辑交给子类处理。Observable 做下改进,抽象出一个 subscribeActual 方法。
一下。

public abstract class Observable<T> implements ObservableSource<T> {
    @Override
    public void subscribeObserver(Observer<T> observer) {
        // observer.onNext(msg);  不这样使用,而是把逻辑交给框架使用者
        // 把功能留给不同的 Observable 处理
        // map  flatmap ……
        subscribeActual(observer);
    }

    // 模板方法 把订阅过程的逻辑交给子类去做
    protected abstract void subscribeActual(Observer<T> observer);
}

Part 2: 使用发射器发送消息

写到这里,还缺少一个发送消息的操作,也就是模式中的 pushMessage。那么先打包一个发射器,在里面实现 observer.onNext 等操作。为了便于扩展,也将发射器定义成接口,可以扩展其实现类。

public interface Emitter<T> {
    //接收消息 update
    void onNext(T t);

    // 接收异常消息
    void onError(Throwable e);

    // 接收消息完成
    void onComplete();
}

现在已经有观察者和被观察者了,他们通过 subscribeActual 建立了绑定关系,现在也有发送消息的发射器,但Observable 和 Emitter 还没有建立联系。

现在要实现:Observable 使用 发射器发送消息。因此这里使用一个接口 ObservableOnSubscribe 来绑定发射器,里面只有一个方法就是绑定一个发射器,被观察者通过这个接口就实现了与发射器的绑定,同时观察者和发射器也实现了分离。

public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter);
}

Part 3: 具体的被观察者

到现在,基本的框架已经搭建差不多了,现在可以来写下被观察者的具体实现

public class ObservableCreate<T> extends Observable<T> {
    ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        observer.onSubscribe();
        CreateEmitter createEmitter = new CreateEmitter(observer);
        source.subscribe(createEmitter);

        // observable 与 emitter 通过 ObservableOnSubscribe 绑定
        // emitter 与 observer 通过 observable 绑定
    }

    // 发射器实现类
    static final class CreateEmitter<T> implements Emitter<T> {
        Observer<T> observer;

        public CreateEmitter(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            observer.onComplete();
        }
    }
}

现在这个壳子就搭好了,RxJava 在创建被观察者的时,是这样使用的 Observable.create() ,所以我们也在 Observable 中创建一个 create() 方法,返回 Observable 对象。

Observable.create(new ObservableOnSubscrib
    @Override
    public void subscribe(Emitter<Integer> emitter) {
        emitter.onNext(1);
    }
})
        .subscribeObserver(new Observer<Integer>() {
            @Override
            public void onNext(Integer o) {
                log("onNext: " + o);
            }
            @Override
            public void onSubscribe() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

Run 一下,结果必须是可以的 666

怎么样,RxJava 就是这么处理的,这里涉及到了一些类,看一下类图


左侧绿色框中用来发送消息,中间蓝色框是被观察者,后边是观察者,这三个部分是相对隔离的。

这三部分的关系其实很清晰



当 Emitter 有动作的时候,比如调用 onNext,就会触发 observer.onNext 的调用。

知道真相的我眼泪掉下来。
了解基本原理之后再去使用相关 API 就会少了很多问号了。

上一篇 下一篇

猜你喜欢

热点阅读