RxJava3.0源码解读

2021-01-25  本文已影响0人  Li李萌

最近我准备在年后离职,所以就看了看RxJava的源码,相信我会加入到年后的求职大军中23333。其实现在突然离开苏州还有点不舍,从18年来到苏州我在这里呆了2-3年了,去过很多地方,有很多的朋友都在这里,也对这个公司很熟悉。现在开始做年后的面试准备,我会和大家一起看看源码,今天我们就从RxJava开始。之前RxJava用过很多次但是没有怎么看过它的源码,今天就好好看。

首先我们要弄清楚RxJava中的几个类:

  • Observable:被订阅者,是事件的来源,通过Emitter发射数据给Observer。
  • Observer:订阅者,通过注册(onSubscribe)过程传给被订阅者,订阅者监听开始订阅,监听订阅过程中会把Disposable传给订阅者,然后在被订阅者中的发射器(Emitter)发射数据给订阅者(Observer)。
  • Disposable:释放器,通常有两种方式会返回Disposable,一个是在Observer的onSubscribe方法回调回来,第二个是在subscribe订阅方法传consumer的时候会返回。
  • Emitter:发射器,在发射器中会接收下游的订阅者(Observer),然后在发射器相应的方法把数据传给订阅者(Observer)。
  • Scheduler:调度器,用于切换线程,不同的调度器(Scheduler)可以将代码放入到不同线程去执行和观察。
  • Consumer:消费器,消费器其实是Observer的一种变体,Observer的每一个方法都会对应一个Consumer,比如Observer的onNext、onError、onComplete、onSubscribe都会对应一个Consumer。

1、操作符create源码分析

下面是一个简单的实例代码:


Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

Observable.create()方法的实现是这样子的

 public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

里面new了一个ObservableCreate对象传到了RxJavaPlugins.onAssembly()中,而RxJavaPlugins.onAssembly()返回了什么呢?我们看一下下面的代码:

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

这段代码中因为我们没有初始化f所以它就是null,所以RxJavaPlugins.onAssembly()返回的就是传入的对象的本身,返回的就是source

2、subscribe()操作符的源码分析

以下代码有省略,我只放出来关键部分的代码

public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    public final void subscribe(@NonNull Observer<? super T> observer) {
        observer = RxJavaPlugins.onSubscribe(this, observer); 
        subscribeActual(observer);
    }

    protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
}

其中RxJavaPlugins.onSubscribe()和上面的RxJavaPlugins.onAssembly()一样,返回的就是我们传入的observersubscribeActual()调用的就是我们上游的 Observable.create()创建的ObservableCreate对象,它是Observable的子类。大家记住这个subscribeActual()方法它是之后我们要讲的其它操作符的关键。我们再看看subscribeActual()具体实现中做了什么?

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    }
}

subscribeActual()实际上它创建了一个发射器new CreateEmitter<>(observer)并且把我们在下游创建的Observer对面给传入进去了。而这个source就是我们一开始Observable.create()时我们自己创建的匿名对象,如下代码所示:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })

实际上source.subscribe(parent)就会调用到我们new的匿名对象里面,我们调用发射器的时候如emitter.onNext(1)就会执行发射器的中的onNext(),下面代码中的observer就是我们在subscribe()订阅时我们创建的Observer(观察者)匿名对象


 @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

小总结:

  • 当我们Observable.create()时就会new ObservableCreate<>(source)这个source就是我们创建的匿名对象,ObservableCreate作为它的成员变量保存起来。
  • 我们去调用subscribe()方法时,会执行上游的Observable中的subscribeActual()方法,而我们上游创建的是ObservableCreate对象,所以接下来它会执行ObservableCreate类中的subscribeActual(observer)方法。
    -subscribeActual()方法会把我们创建的Observer对象传入到CreateEmitter(发射器)中。并且执行source.subscribe(parent)。此时就会调用我们Observable.create()时创建的匿名对象中的subscribe()方法。
  • 当我们调用emitter.onNext(1)就会调用我们创建的Observer中的onNext()方法

3、map操作符源码解读

我们先看一段示例代码:

        Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        Observable<String> mapObservable = createObservable.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return String.valueOf(integer);
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        mapObservable.subscribe(observer);

通过上面我们知道create()操作符会创建一个ObservableCreate对象,同理map操作符会创建一个ObservableMap,他们都是被观察者都继承了Observable,他们能完成不同的功能其实就是通过subscribeActual()方法实现的。下面我们将具体来看一下。

    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

在调用map()时会创建一个ObservableMap对象。这个this就是createObservable对象,mapper就是我们创建的匿名对象。同理当我们调用subscribe()订阅方法的时候就会执行ObservableMap类中的subscribeActual()方法。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
 final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);//上游的被观察者对象,如ObservableCreate
        this.function = function;//我们在map()方法中传入的匿名对象
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));//创建一个观察者对象,再订阅上游的被观察者
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;//我们在map()方法中传入的匿名对象

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {//当执行了onComplete()或者onError()就不再执行
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
              //调用了我们创建的匿名对象中的applp()方法
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
          //downstream:下游的Observer对象
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }
    }
}

subscribeActual()方法中创建了一个观察者对象(MapObserver),同时去订阅上游的被观察者(ObservableCreate)。

小总结

  • map操作符在subscribeActual()中创建了一个观察者对象MapObserver,通过subscribe()去订阅上游的被观察者。
  • 当上游的被观察者ObservableCreate发生变化之后(如调用了emitter.onNext(1))就会执行MapObserver类中的onNext()方法。
  • 然后再调用我们传入的Function对象中的apply()方法。并将返回(v)值通过onNext()传入到下游的Observer(我们创建的匿名对象)中的onNext()方法中。

4、subscribeOn操作符(线程切换)

如下示例代码,我们通过subscribeOn()操作符去切换线程,从而使被观察者在哪个线程去执行。

        Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        
        Observable<Integer> observableSubscribeOn = createObservable.subscribeOn(Schedulers.io());

        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        observableSubscribeOn.subscribe(observer);

下面我们看一下subscribeOn()的具体实现:

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

subscribeOn()中创建了一个ObservableSubscribeOn(被观察者)的对象

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //创建一个新的观察者(SubscribeOnObserver)并将下游的观察者observer传入进去
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        //调用下游的observer的onSubscribe方法,这里我们可以看见,下游的onSubscribe还是在当前线程执行的
        observer.onSubscribe(parent);
        //这句代码的本质就是将一个实现了Runnable接口的SubscribeTask放到线程池去执行
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

当执行了subscribeActual()方法时里面包装了一个SubscribeOnObserver(观察者),并将下游的observer作为成员变量。并创建一个SubscribeTask对象,将这个对象交由scheduler去执行。scheduler就是Schedulers.io()SubscribeTask 本身就一个实现了 Runnable接口的类,当线程开始实行的时候就会执行run()方法中的 source.subscribe(parent),所以我们才可以将其放到IO线程去执行。剩下的onNext(),__ onError()__,onComplete()类似都是调用下游的observer(观察者对象)。
从上面的源代码中我们明白了,不管subscribeOn()执行了多少次只会以第一次为准。

下面我们在看一下scheduler

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

当我们在调用Schedulers.io()时返回是一个IO的成员变成,它其实是在静态代码块中进行初始化的。

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        //初始化IO
        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

下面的initIoScheduler()就是返回一个SchedulercallRequireNonNull(defaultScheduler);方法的实质就是获取Scheduler,最后通过new了一个IoScheduler对象。

    @NonNull
    public static Scheduler initIoScheduler(@NonNull Supplier<Scheduler> defaultScheduler) {
        Objects.requireNonNull(defaultScheduler, "Scheduler Supplier can't be null");
        Function<? super Supplier<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
 static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }

   static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

小总结

  • subscribeOn(Schedulers.io())的实质就是将被观察者(Observable)放入到线程池中去执行订阅source.subscribe(parent)

observeOn操作符源码解读

 Observable<Integer> observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());

通过上面我们关于subscribeOn()源码解读我们知道其实质就是将subscribe()放入到子线程去执行,所以也能猜到observeOn()就是将onNext()放入到主线程去执行,下面我们来看一下源码。

    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

observeOn()里面创建了一个ObservableObserveOn对象,它同样也是一个被观察者继承至Observable,并且显示了Runnable接口。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //通过我们传入的scheduler,获取Worker 
            Scheduler.Worker w = scheduler.createWorker();
            //订阅是在当前线程执行的
            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
   ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
  }
}

ObserveOnObserver收到上游的被观察者信息,调用onNext(),__ onError()__等方法时,就会将其放入到线程中去执行。关于drainFused()drainNormal()的本质就下游的onNext()方法放入到主线程去执行(因为这两个方法行数有点多,我没有放入到上面的代码中去)。

小总结

  • 我们看见observeOn()操作符和前面的一样都是将其封装成一个继承至ObservableObservableObserveOn对象。
  • 在其内部通过subscribeActual()订阅上游的被观察者,并且将下游的Observer(观察者)和Scheduler.Worker包装到它的静态内部类中(ObserveOnObserver)。
  • 当上游的被观察发生变化的时候就会调用schedule()方法,将下游的onNext()onError()等回调放入到observeOn()中传入的线程中去执行。

最后

RxJava的本质就是观察者模式,不同的操作符都是返回一个被观察者(Observable),并且将下游的观察者封装到另一个Observer中(如:将下游的Observer封装到MapObserver中),操作符能完成特定的功能是因为特定的ObservableObserversubscribeActual()onNext()onSubscribe(),onError()等方法中完成了具体的实现。

上一篇下一篇

猜你喜欢

热点阅读