androidAndroidAndroid开发经验谈

RxJava 2.x 源码分析(一)

2018-05-11  本文已影响42人  zYoung_Tang

主要介绍 Rxjava 是如何利用观察者模式实现通信

依赖

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

核心类和接口

Observable (被观察者)

被观察者必须继承的抽象类

// T: `被观察者`发射的 item 的类型
public abstract class Observable<T> implements ObservableSource<T>
Observer (观察者)

观察者必须实现的接口

public interface Observer<T>
Emitter (发射器)

用于被观察者发射信息给观察者

// T: 发射的 item 类型
public interface ObservableEmitter<T> extends Emitter<T>

例子使用到的的相关类

ObservableCreate

被观察者的子类,Observable.create() 方法返回 Observable 就是该类的实例

public final class ObservableCreate<T> extends Observable<T>
CreateEmitter(发射器)

ObservableCreate 的内部类,实现了ObservableEmitter

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable
Disposable

一个提供断开操作的接口,所有需要做断开操作的类都要实现该接口

/**
 * Represents a disposable resource.
 */
public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}
ObservableOnSubscribe

一个简单的功能接口,只负责给被观察者提供订阅方法和传一个发射器实例给被观察者

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

从一个简单的例子查看源码:

private void RxJava() {
    // 第一步:初始化'被观察者' observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            // 传一个事件给观察者观察
            e.onNext(1);
            e.onNext(2);
            // 调用 onComplete 方法后调用接收不到后面的事件
            e.onComplete();
            // 向观察者传一个异常事件,并且观察者接收不到后面的事件
            //e.onError(new NullPointerException("空指针"));
            e.onNext(3);
            e.onNext(4);
        }
    });

    // 第二步:初始化'观察者' observer
    Observer<Integer> observer = new Observer<Integer>() {
        private int i;
        private Disposable mDisposable;

        /**
         * 该方法是订阅操作中第一个调用的回调方法
         * 提供了一个可主动切断的对象
         */
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            mDisposable = d;
        }

        /**
         * `被观察者`调用该方法并传给`观察者`一个 T 类型的 item
         */
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG, "Observer 接收到: " + integer);
            i++;
            if (i == 2) {
                // Disposable 用来切断观察者与被观察者之间联系,被观察者之后的事件不再传给观察者
                mDisposable.dispose();
            }
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "Observer onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "Observer onComplete");
        }
    };

    // 第三步:订阅
    observable.subscribe(observer);
}

第一步中create()中进入源码:

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 判断传入的 ObservableOnSubscribe 对象是否为 null
    ObjectHelper.requireNonNull(source, "source is null");
    // 通过 RxJavaPlugins 组装并返回 ObservableCreate 的实例
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

Observable 是一个抽象类,当我们使用 Observable.create 方法创建被观察者 observable 的时候实际上是创建一个 ObservableCreate 对象。

订阅方法 subscribe 中实际进行订阅操作的是一个抽象方法 subscribeActual:

// Observable.java
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 实际进行订阅的操作
        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
         ...
    }
}

下面是 ObservableCreate 的实现:

// ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
    // 创建发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 调用'观察者'的 onSubscribe 方法
    observer.onSubscribe(parent);
    try {
        // source : 被观察者
        // parent : 发射器
        // 这里回调我们覆写的 Observable 的 subscribe 方法,开始发射事件操作
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

subscribeActual订阅操作方法调用顺序:

观察者onSubscribe方法 -> 被观察者subscribe方法 -> 被观察者发射操作onNext() -> 观察者onNext()

从源码中得知,我们需要覆写的subscribe方法中得到的 e 实际上就是CreateEmitter,当我们调用e.onNext(1)的时候实际上就是调用 CreateEmitteronNext

// ObservableCreate.CreateEmitter.java
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    // 判断被观察者是否已断开了
    if (!isDisposed()) {
        // 回调我们覆写的观察者的 onNext 方法
        observer.onNext(t);
    }
}

CreateEmitteronNext()中我们可以知道只有当被观察者没有断开的时候才会回调观察者onNext()

// ObservableCreate.CreateEmitter
@Override
public void onComplete() {
    if (!isDisposed()) { //检测是否已断开
        try {
            // 把Complete事件发射给观察者
            observer.onComplete();
        } finally {
            // 断开操作
            dispose();
        }
    }
}
        
@Override
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}

@Override
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            // onError() 方法最终也会调用该方法
            dispose();
        }
        return true;
    }
    return false;
}

@Override
public void dispose() {
    // 断开操作
    DisposableHelper.dispose(this);
}

很多人都知道 onError() 和 onComplete() 不能并存,从上面源码可以看到两个方法最终都会执行 dispose() ,所以无论执行哪个方法最终都会切断被观察者观察者之间的联系,之后调用的方法都会失效。且由DisposableHelper这个枚举类实现具体操作

public enum DisposableHelper implements Disposable {
    /**
     * The singleton instance representing a terminal, disposed state, don't leak it.
     */
    // 用来标记断开状态
    DISPOSED
    ;

    /**
     * Atomically disposes the Disposable in the field if not already disposed.
     * @param field the target field
     * @return true if the current thread managed to dispose the Disposable
     */
    public static boolean dispose(AtomicReference<Disposable> field) {
        /**
        * 还记得 CreateEmitter 继承了 AtomicReference 吗
        * 下面的操作属于原子性操作:
        */
        // field: CreateEmitter
        // 首次调用时获取的 current 为 null
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) { 
            // getAndSet 操作是把 d 的值赋给 field,但是返回旧值(null)
            // 下一次执行该 dispose 方法时 field.get()的值就会变成 DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
    
    /**
     * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
     * @param d the disposable to check
     * @return true if d is {@link #DISPOSED}
     */
    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }
}

这里我们注意到断开操作的核心就是让发射器继承原子性引用类 AtomicReference 保存和改变状态,防止多线程并发出现异常


总结

参考文章

上一篇下一篇

猜你喜欢

热点阅读