Android相关

RxJava 2 源码解析之创建-订阅-变换-发布

2018-01-22  本文已影响151人  LeonXtp

本文源码基于2.1.8版本。

一段非常典型RxJava使用流程:

        Observable
                .create(new ObservableOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer);
                    }
                })
                .subscribeOn(Schedulers.computation())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("RxJava2", s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

那么它内部是怎么执行起来的呢?

Observable.java

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

RxJavaPlugins.onAssembly()在这里仅仅是返回了参数中的Observable而已,重点关注ObservableCreate:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // ... 省略
}

它的构造方法仅仅是保存原始的ObservableOnSubscribe。那么,最终还是看看ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

它只是一个接口,而且RxJava没有它的默认实现,我们全都需要自己实现。

先跳过subscribeOn和map方法,直接看subscribe():

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

这里代码虽然很多,但是核心代码就是

observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);

而已。

    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }

这里基本也跟之前一样,没干什么事,直接返回原来的Observer。那么subscribeActual呢?

    /**
     * Operator implementations (both source and intermediate) should implement this method that
     * performs the necessary business logic.
     * <p>There is no need to call any of the plugin hooks on the current Observable instance or
     * the Subscriber.
     * @param observer the incoming Observer, never null
     */
    protected abstract void subscribeActual(Observer<? super T> observer);

它只是一个抽象方法,那么它的实现在哪里呢?
之前我们创建Observable的时候:我们在create()方法里创建了一个ObservableCreate对象,就是它了,看看它的subscribeActual()是怎样的:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

首先创建一个继承自Emitter的CreateEmitter,然后调用Observer的onSubscribe()方法,
再调用ObservableOnSubscribe的subscribe()方法,这个方法就是我们的代码里的回调了:

        Observable
                .create(new ObservableOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                    }
                })

然后我们在回调里执行了

emitter.onNext(1);

进去看看它做了什么:

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

Emitter只是一个接口,它的实现是CreateEmitter,再看看它儿子长什么样:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return emitter.isDisposed();
        }

        // 其他代码省略

它是ObserverbleCreate的静态内部类!它仅仅就是调用Observer的onNext方法而已,其他也差不多。

至此,一个RxJava2的基本执行流程就分析完了,看起来还是很简单的。

那么再看看map操作:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

这套路跟之前的create很像,而且ObservableMap跟之前的ObservableCreate也很类似,它继承自AbstractObservableWithUpstream,这个暂时不管,也是继承自Observable的。

不过这里要注意的是,这里执行的是ObservableCreate的实例的map方法。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        // 给Observable重新订阅一次监听者
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

       // 省略
    }
}

我们之前在分析create的时候,它创建了一个ObservableCreate对象,在我们subscribe的时候,调用了这个对象的subscribeActual()方法,最终它在这里调用了ObservableOnSubscribe的subscribe()方法。

而这个MapCreate也是类似,只不过ObservableCreate中保存的source是ObservableOnSubscribe,而MapCreate中保存的,是ObservableSource,它是Observable的父类,因为,Observable在create后就已经创建成功了嘛。

在我们这个例子中,MapCreate中保存source的是之前创建好的ObservableCreate实例。它有个静态内部类MapObserver,它是BasicFuseableObserver的子类,而这个家伙是Observer的子类。

OK,现在再重新看看加入了map操作符之后,新的subscribe执行流程:

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

在这里创建了MapObserver实例,并调用ObservableCreate实例的subscribe()方法,并将这个Observer传入进去。也就是说,这时候ObservableCreate的观察者是一个中间人。

    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {

            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }

            if (beforeDownstream()) {

                actual.onSubscribe(this);

                afterDownstream();
            }

        }
    }

这里的actual就是实际上我们定义的真正的Observer。

再次贴出它的onNext()代码:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

前面说过,它的observer现在已经是MapObserver了,所以来到了这里:
MapObserver.java

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

重点关注这两行:

ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");

actual.onNext(v);

这里的mapper就是我们map()操作符中创建的Function参数了,Function接口定义如下:

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

这时,我们的map就回调成功了,完了之后它就调用了真正Observer的onNext()方法。

至此,一次包含map()操作符的RxJava2的事件创建、订阅、map转换、事件发射、事件响应流程就解析完了。

从以上解析可以看出,不含map操作的时候,流程还是很简单的,包含了map就复杂些了。那么看完后,抛出个问题:map究竟是怎样实现变换的呢?如果我再给你多加几次map操作呢?哈哈,是不是有点晕?

其实总结起来,它就是这么一个思想:

这样就完成了操作符的无缝衔接。
至于那些变换,只是在链条中的事件形式变换而已。

上个总结图:


rxjava2-create-subscribe-emit-flow.png

这样就很清晰了,整个创建流程是从上到下,然后整个订阅流程又从下到上,事件发布时,又是从上到下,有木有,有木有,有木有!

再去看看其他操作符,都是这个套路,理解了这个,理解RxJava2就容易多了。

上一篇 下一篇

猜你喜欢

热点阅读