Android开发Android进阶之路Android技术知识

Rxjava2源码浅析(一)

2017-03-17  本文已影响0人  Sp_WannaSing

面试的时候被问道各种框架的原理架构,也是很尴尬,自以为写的代码不少,用过的框架也不少,深入的去研究源码的还真是不多,也是给自己敲了一个警钟,今天就来尝试剖析一下Rxjava2的源码,水平有限,就先看一下基础的用法相关,一些难度更高的操作符就慢慢来分析吧。
就按照平时使用的顺序来分析:

一、初始化Observerble

基本使用实例:

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aaa");
            }
        });

先看一下内部的参数 ObservableOnSubscribe<>() 。

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

就是一个接口,这里用的就是它的一个匿名实现类。而接口内部的方法中我们看到ObservableEmitter<> 是一个Rxjava2新推出的类,俗称发射器。

public interface ObservableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    ObservableEmitter<T> serialize();
}

这里面几个回调方法的作用注释也说的很清楚了就不多说了。
它继承自Emitter

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

可以看到,这里面就是我们比较熟悉的next、complete、error三个回调方法了。其实这个create方法内部的参数就是两个接口的回调,理解就行了,然后看一下create方法。

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

requireNonNull是很好理解的,看名字也能猜测出是测试传进来的ObservableOnSubscribe是否为空

  public static <T> T requireNonNull(T object, String message) {
        if (object == null) {
            throw new NullPointerException(message);
        }
        return object;
    }

而源码也验证了我们的想法。关键是后面一句,先看一下具体的方法实现。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

一句一句来分析:

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

这里的Function也是一个接口,作用也很明显,将T类型的数据转化成R类型数据。那是我们在使用到

        observable.map(new Function<String, Object>() {
            @Override
            public Object apply(@NonNull String s) throws Exception {
                return null;
            }
        })

类似这种类型转换的语句时候才会用到,这里我们先不管它,一开始是默认为null的,所以这个方法最后就会return source;就是将括号中的new ObservableCreate<T>(source)原样返回。这个ObservableCreate又是什么呢?

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

源码比较长我们就之看一下它的构造函数就可以了,目前只需要知道这是一个Observerble的子类就可以了,至于Observerble这个类,等到大概摸清楚了事件流程再回头来分析。所以到现在我们的第一步初始化就算是分析完了流程。

二、初始化一个Observer

用法示例:

Observer<String> observer=new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                subscription=d;
            }

            @Override
            public void onNext(String value) {
                LogUtil.log(TAG,"  "+value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
                LogUtil.log(TAG,"complete");
            }
        };

这个分析就要简单很多了,Observer只是一个简单的接口,这里也只是具体实现了一下接口回调。

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

不过这里和Rxjava1也是有些区别的,多了一个onSubscribe 注释也说的很清楚,用于随时取消订阅。
第二步很轻松,下面看一下第三步

三、建立订阅关系

用法示例:

observable.subscribe(observer);

这里我们就只分析最简单的一种,看一下源码:

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

第一句还是一样,判断是否为空,平时自己写代码也要像这样注意代码的健壮性。
重点就是这三句了。

 observer = RxJavaPlugins.onSubscribe(this, observer);

 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

 subscribeActual(observer);

一句一句来看:

 public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }

在这个onsubscribe中是不是觉得有些眼熟?就跟刚刚的onAssenmbly几乎一样,由于我们没有其它的功能,所以这里onObservableSubscribe也是null,也是返回原值,下面的requireNonNull我们也见过了,又验证一遍是否为空,因为如果我们加入了Function函数,上面就不会返回原来的observer了,所以还要再验证一遍。
于是就到了最后一句

protected abstract void subscribeActual(Observer<? super T> observer);

???
怎么是个abstract方法?那么它是在哪实现的呢?
回想看我们的observable初始化过程。哪里出的问题呢?就是我们一开始没有分析的ObservableCreate,我们在初始化的时候就将一个ObservableCreate类向上转型赋值给了Observerble,所以方法的具体实现也就在ObservableCreate里了。
继续跟进。果不其然:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

还是一句一句来看

CreateEmitter<T> parent = new CreateEmitter<T>(observer);

又是一个新的类

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

所以我们的前两句

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

所以我们的前两局就是回调了onSubscribe接口,从而将这个CreateEmitter类型转型成Disposable输出了。而CreateEmitter的初始化参数又是observer本身,所以大体上可以看成回调了另一个格式的自己。。。然后一般可用于自杀(取消订阅)。。。
然后就来到的最后一句

source.subscribe(parent);

这里的source就是

new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        }

刚刚我们初始化observable传入的。这个parent->这里的参数e。于是就这样完成了Observerble和Observer的绑定,也就能实现接口回调了。

没有任何其它功能,只是走了一边最基本流程的Rxjava源码,后面还会继续更新的。

上一篇下一篇

猜你喜欢

热点阅读