Android学习之旅

RxJava基本使用-源码解析(一)

2019-10-26  本文已影响0人  pj0579

最简单的使用方法是这样的

       // 被观察者发送事件  观察者响应事件
      Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        });

这边分为两个步骤
1.create
2.subscribe
create

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
         // 判空操作
        ObjectHelper.requireNonNull(source, "source is null");
        // 继续包装
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
 // 看下onAssembly方法 实际返回的就是ObservableCreate类对象
 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

由上面可以知道subscribeObservableOnSubscribe类的方法,在ObservableOnSubscribe类中找不到,很容易在Observable中找到subscribe的方法。

 public final void subscribe(Observer<? super T> observer) {
        // 判空操作  
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // 返回observer对象
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

subscribeActual(observer);是这段代码的关键 调用的是ObservableCreatesubscribeActual方法

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
         // 实现ObservableEmitter<T>把这个对象传递到subscribe 通过这个对象发送事件
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 调用我们代码里写的onSubscribe方法
        observer.onSubscribe(parent);

        try {
            // 回到我们代码里写的subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

到这里最简单的调用过程就结束了

上一篇下一篇

猜你喜欢

热点阅读