工作生活

从源码分析RxJava订阅过程

2019-06-30  本文已影响0人  有没有口罩给我一个
wdroid.jpg

都知道观察模式吧?

在开始之前让我们简单了解一下观察模式,就是某对象A的变化引起其他多个对象B变化,但是前提是你需要去订阅我,打个比方:就是我的状态发生了改变,那我怎么通知你呢?所以我需要知道的如何去通知其他对象说我这里已经改变了,你看看那需不需要做出改变。就比如微信的订阅号,如果你不订阅,那该订阅号在发布内容也不会通知,这里的订阅号就是被观察者,而用户就是观察者。那怎么说让这两者关联来呢?前面说的订阅号是要提供一个接口,允许用户去订阅的,所以最后就是被观察者和观察者两个都得提供接口,订阅号提供的接口让用户去订阅类比微信号,当订阅号发布内容,就通过这个微信号通知观察者,所以订阅就是这两者的关联点。

开始之前的两个重要的类或接口:ObservableObserver

订阅流程分析

开始RxJava的订阅流程分析之前,来个简单的栗子,代码如下:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

日志结果:

onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3

如上代码,之所以分开来写是为了更清晰的去理解每一步RxJava生成的相关类。

如果你认真看前面的内容,你一下就明白Observable.subscribe()方法也就是订阅的意思,是 ObservableObserver 的关联点,也就是被观察者和观察者的关联点,所以我们的分析就从Observable.subscribe(Observer observer)方法开始代码如下:

 public final void subscribe(Observer<? super T> observer) {
    try {

        // .....此处省略几亿代码....

        //此方法在Observable类是中是抽象的,注定是子类实现
        subscribeActual(observer);

        // .....此处省略几亿代码....
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
       // .....此处省略几亿代码....
    }
}

// Observable.create(ObservableOnSubscribe)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // .....此处省略几亿代码....

    // 直接就创建了ObservableCreate,并把source作为参数传进去
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

// onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们从上面代码我们知道在Observable.create(ObservableOnSubscribe)中直接就创建了ObservableCreate,而ObservableCreate是Observable的子类,并把source作为参数传进去,最后调用RxJavaPlugins.onAssembly方法,我们默认返回ObservableCreate实例,所以Observable.create方法最后返回的是ObservableCreate实例,所以就验证了上面的第三点实际调用的是ObservableCreate.subscribeActual(observer)方法,这是在不考虑其他变换和线程切换的情况,那我们就来看看ObservableCreate.subscribeActual(observer)方法的实现,代码如下:

 @Override
protected void subscribeActual(Observer<? super T> observer) {
    //事件发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //直接回调Observer的onSubscribe方法,这个方法是和线程切换无关,只在当前的线程中执行
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

代码不多,也很好理解:

看一下CreateEmitter的实现,代码如下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    //通过构造方法注入 观察者实例
    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

   // .....此处省略几亿代码....

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

   // .....此处省略几亿代码....
}

为了简介清晰我删掉很多无关代码,只保留onNext等这些相关的方法。

有没有发现从一开始我们就仅仅讲了从Obsevable的创建到订阅,这是比较汉理解的,如果我增加一个map或线程切换呢?这里暂时不展开讲线程切换。

重新把栗子的代码在贴一遍:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
        @Override
        public Object apply(String s) throws Exception {
            Log.e("tag", "map");
            return "aa";
        }
    })

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

如上代码,订阅流程会和之前的有什么不一样呢?那么我们看个究竟,就从 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())开始,代码如下:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //这里把上游this传进去也就是source,以便调用上游的subscribe方法
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

从上面代码看,我们知道在map方法中创建了ObservableMap并把上游的Observable参进去了,而我们知道从Observable.subscribe方法开始订阅就会调用 subscribeActual(observer)方法,所以在Observable.subscribe之后就会调用ObservableMap的subscribbeActual方法,代码如下:

 public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

在ObservableMap的subscribbeActual方法中,直接调用传进来的Observable的subscribe方法又间接调用subscribbeActual方法没所以,订阅的过程实际上是一样的。

总结

上一篇下一篇

猜你喜欢

热点阅读