Android开发Android开发经验谈Android技术知识

Android RxJava2.x(二)线程调度和操作符源码分析

2020-04-19  本文已影响0人  程序员三千_

Android RxJava2.x(一)整体框架分析

使用过RxJava的人都知道,方便的Api链式调用外,还有一个非常好用的地方,就是线程调度和操作符的使用,我们来举个例子

    Observable.just("test")   
                //指定了被观察者执行的线程环境为io线程
                .subscribeOn(Schedulers.io())
                //使用map操作来完成类型转换
                .map(new Function<String,String>(){
                    @Override
                    public String apply(String s) throws Exception {
                        //这里执行一个耗时操作,比如IO操作
                        return s + "map";
                    }
                })
                //将后面执行的线程环境切换为主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                Log.i("xiaosanye","s: " + s);
                            }
           });

1、源码分析observable.subscribeOn和observable.observeOn的内部实现

Observable#observable.subscribeOn
=>return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
=> ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>

     @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

observable.subscribeOn通过return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))方法,将this也就是observable本身传进去,最终返回一个ObservableSubscribeOn对象,ObservableSubscribeOn继承自AbstractObservableWithUpstream
AbstractObservableWithUpstream继承自Observable并且实现HasUpstreamObservableSource接口,
所以ObservableSubscribeOn对象本身是一个被观察者,
我们我们可以得出结论,调用observable.subscribeOn最后实际上是返回了一个新的被观察者ObservableSubscribeOn

我们再看observable.observeOn

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

Observable#observable.observeOn
=>return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))
=>ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>

同理调用observable.observeOn通过return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))方法,也将this也就是observable本身传进去,最终返回的也是一个新的被观察者ObservableObserveOn

由上一篇文章我们知道,我们在Observable.create的时候返回一个被观察者ObservableCreate对象,所以我们创建observable的时候实际上返回的是ObservableCreate对象。

image.png

所以最后,我们Observable.subscribe订阅的时候实际上调用的就是经过装饰的最外层ObservableObserveOn

细心的小伙伴肯定也看到了,其实在它们各自的subscribeActual方法里也会对观察者对象进行一次装饰
ObservableObserveOn#subscribeActual

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

ObservableSubscribeOn#subscribeActual


 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

最后返回对应的观察者对象ObserveOnObserver、SubscribeOnObserver

最后还有一点要注意的是:

那为什么会这样呢?我们看到它们各种的subscribeActual方法里,observeOn是每次都会new一个ObserveOnObserver,ObserveOnObserver里都会传入一个observer和Scheduler.Worker,而subscribeOn只是调用自身的scheduler.scheduleDirect方法。

2、源码分析操作符Map的内部实现

Observable#map

   @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

经过上面分析,这段代码很熟悉了把,把this也就是Observable对象和mappr(我们自定义的函数fuction)传进去,最终肯定是返回一个ObservableMap被观察者对象的,我们再点进去ObservableMap看看,


public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
           if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
         ....省略部分代码
}

我们看到在subscribeActual方法里有一句代码source.subscribe(new MapObserver<T, U>(t, function));这句就是RxJava2.x所有操作符的核心代码了。
source就是我们上游传进来的被观察者,然后调用source.subscribe把function传进去,我们知道在被观察者subscribe完,肯定是走下面的onNext方法了,我们看到onNext里的一句关键代码mapper.apply(t),这句就是我们实际转变的地方,也就是我们在外层实现的apply方法。

在RxJava2.x中,所有操作符的核心原理,都是继承AbstractObservableWithUpstream,在subscribeActual方法的 source.subscribe里实现自己的核心原理,如果我们自己想自定义操作符的话其实也可以继承AbstractObservableWithUpstream然后实现subscribeActual来完成。(不过基本RxJava里的操作符都够用了)

上一篇 下一篇

猜你喜欢

热点阅读