RxJava2 源码解析(一)

2019-08-08  本文已影响0人  徘徊0_

简述:本篇主要分析 Observable、Observer 产生(create)、关联(subscribe)、数据发送(emitter)的过程!

下面这段代码,是常规的RxJava的操作,从这段代码入手分析:

        //1- create 一个 Observable
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("This is Create!");
            }
        }).subscribe(new Observer<String>() { //订阅一个 Observer
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

一、Create 被观察者 Observable 过程

1,被观察者Observable

//抽象类,具体实现由子类来做
public abstract class Observable<T> implements ObservableSource<T> {
 .....
}

2,接着看 Observable . Create 方法

        //1- create 一个 Observable
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("This is Create!");
            }
        })//....省略部分代码

//*****************************分割线*********************************

  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

这里需要注意:

3,上面create方法提到了3个重要的地方,ObservableOnSubscribe、ObservableCreate、RxJavaPlugins.onAssembly 继续往下看

RxJavaPlugins.onAssembly 分析:;

onAssembly方法.png
这个方法需要一个Observable作为参数,上面示例代码中,直接new ObservableCreate<T>(source)作为参数,其中这个source也就是上面的ObservableOnSubscribe。这里又提到ObservableCreate这个类,顾名思义就是:被观察者生产类,继续往下看:

ObservableOnSubscribe,需要注意里面的ObservableEmitter后面会分析,源码如下:

public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

ObservableCreate 分析:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //这里的 ObservableOnSubscribe 作为参数传递进来
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //注意,这个方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 将观察者作为参数,传给 CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //将parent 传到了onSubscribe()
        observer.onSubscribe(parent);

        try {
            // 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
  //...........省略部分代码
}

注意

接着具体看下CreateEmitter这个类:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        //******************1************************
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //********************2***********************
            //如果没被阻断,调用观察者的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        //.... 省略部分代码
    }

注意,上面标识的地方:

CreateEmitter 实现了 ObservableEmitter<T>, Disposable 接口

public interface ObservableEmitter<T> extends Emitter<T> {
  //.... 省略代码....
}

这里ObservableEmitter又继承自Emitter,接下来看一下Emitter

public interface Emitter<T> {
    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

Emitter: 这里面就是我们经常看到的三个方法!

到这里也就说明了,为什么上面代码中emitter.onNext("This is Create!"); 会走到下面观察者的onNext()方法了。

Create方法总结:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

Observable.create() 方法需要一个ObservableOnSubscribe作为参数,然后又将这个ObservableOnSubscribe作为参数传给了new ObservableCreate<T>(source) , 并返回,这也就是生成的被观察者。

二、接着看 Observable . subscribe() 订阅方法

{@link Observable#subscribe()}这个方法主要是:被观察者订阅观察者:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //可以忽略
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            //注意这个方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

subscribeActual 分析:

protected abstract void subscribeActual(Observer<? super T> observer);
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //这里的 ObservableOnSubscribe 作为参数传递进来
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //注意,这个方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 将观察者作为参数,传给 CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //将parent 传到了onSubscribe()
        observer.onSubscribe(parent);

        try {
            // 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
  //...........省略部分代码
}

注意:

三、观察者:Consumer 分析

在被观察者订阅观察者的时候,可以发现有好几个重载方法,上面分析了Observer

订阅方法.png

但是Observer需要实现所有的方法,如果只需要onNext()、onError(); 就需要使用到Consumer这个接口类:

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

如果只需要关注onNext()方法,可以调用Observable的这个调用方法,参数为Consumber

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

继续跟到subscribe(Consumer<? super T> onNext) ;方法:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        //一系列的判空操作
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        //重要!!!
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        //这里就是调用具体的实现类的方法,和上面的流程一样
        subscribe(ls);

        return ls;
    }

这里主要是LambdaObserver这个类,可以看出,这个类也是一个观察者:

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    //构造方法接收三个参数,对应 next,error,complete
    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                d.dispose();
                onError(ex);
            }
        }
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onComplete.run();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }

    @Override
    public boolean hasCustomOnError() {
        return onError != Functions.ON_ERROR_MISSING;
    }
}

LambdaObserver这个类也是一个Observer和之前的观察者一样,都包含next、error、complete这几个方法,也是将LambdaObserver传入到ObservableCreate 与被观察者产生关联。

这里可以只传入我们关心的回调,例如上面只传入了onNext()的回调,原因如下:

 public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

这里的Functions.ON_ERROR_MISSING其实也是一个Consumer,可以理解为一个占位,例如当error回调的时候,我们并没有实现,他会回调到下面这个默认实现中去:

public static final Consumer<Throwable> ON_ERROR_MISSING = new OnErrorMissingConsumer();

//---------------------------------------------------------
  static final class OnErrorMissingConsumer implements Consumer<Throwable> {
        @Override
        public void accept(Throwable error) {
            RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
        }
    }

上一篇下一篇

猜你喜欢

热点阅读