安卓开发博客

手写 RxJava ---- Create 函数

2019-11-13  本文已影响0人  石器时代小古董

一、定义观察者接口

RxJava 的观察者接口

public interface Observer<T> {
    void onSubscirbe();

    void onNext(T value);

    void onComplete();

    void onError(Throwable throwable);
}

二、定义被观察者接口

使用 subscribe 函数中的 observableEmitter 是观察者的引用,通过 SelfObserverable 的 subscribe 拿到的

public interface ObservableOnSubscribe<T> {
    public void subscribe(Observer<? super T> observableEmitter);
}

三、创建外观类

SelfObserverable 作为对外的入口类,提供了 create 函数,create函数会创建 SelfObserverable 实例并通过 SelfObserverable 的构造函数持有 ObservableOnSubscribe (被观察者类)的引用。

注意:create 函数的第一个 T 是为这个静态方法声明一个泛型,第二个 T 是类声明的泛型但传入的是 create 函数声明的类型,参数 T 使用的是函数 create 声明的类型

public class SelfObserverable<T> {
    ObservableOnSubscribe source;

    private SelfObserverable(ObservableOnSubscribe source) {
        this.source = source;
    }
    // create 方法在创建时持有了被观察者---ObservableOnSubscribe
    // 第一个 T 是静态方法声明的 T     参数的 T 是静态方法声明的泛型
    // 上下线在作为参数时 表示的就是类型的界限
    // 当泛型作为参数时会有可读和可写的区别  super 表示参数是可以被写的  extends 只能是可读
    public static <T> SelfObserverable<T> create(ObservableOnSubscribe<? extends T> subscribe) {
        return new SelfObserverable<T>(subscribe); // 返回的是静态方法返回的 T
    }
}    

三、创建订阅方法

提供订阅方法让被观察者 (ObservableOnSubscribe) 持有了观察者 (Observer) 的引用。这样在 ObservableOnSubscribe 中通过 ObservableOnSubscribe 函数

    /**
     * 在使用订阅时 首先触发观察者的 onSubscribe 方法
     *
     * 这里的 T 是指类中声明的类型
     *
     * @param observer
     */
    public void subscribe(Observer<? extends T> observer) {
        // 触发订阅完成
        observer.onSubscirbe();
        source.subscribe(observer);
    }

四、在 subscribe 方法中触发 Observer 的回调

因为在调用 subscribe 函数时持有了 Observer 的引用(见第三条),这时可以在 ObservableOnSubscribe 的 subscribe 函数中拿到这个引用来触发 Observer 对象的函数。

 SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
            // 这里必须是  ? super Integer  这种可读的模式
            // 否则 onNext 无法传入 1 这个参数 编辑器报错
            @Override
            public void subscribe(Observer<? super Integer> observableEmitter) {
                // observableEmitter 就是下游
                // 我们 new 出来的 Observer 对象 只不过这个引用被 ObservableOnSubscribe 持有了
                observableEmitter.onNext(1);
                observableEmitter.onComplete();
                observableEmitter.onError(new NullPointerException());
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscirbe() {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable throwable) {

            }
        });

五、泛型的使用

1 .<? extends T> 在作为参数的类型进行声明时表示 T 只能是 T 以及 T 的子类。

2.<? extends T> 所声明的参数在被使用时,表示只可读,比如 observableEmitter.onNext(1); onNext 的类型如果是 <? extends T> 类型,编译器将报错。

3.<? super T> 作为参数类型声明时表示只能是 T 以及 T 的父类

4.<? super T> 所声明的参数在被使用时表示可写。如 observableEmitter.onNext(1); 可以正常使用

上一篇 下一篇

猜你喜欢

热点阅读