RxJava源码解析

2018-09-13  本文已影响0人  kjy_112233

基本框架

        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

(1)创建Observable流程源码分析
create方法源码

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

返回值是Observable,参数是ObservableOnSubscribe
ObservableOnSubscribe源码

@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

ObservableOnSubscribe是一个接口,里面就一个我们实现的那个方法。该方法的参数是ObservableEmitter
ObservableEmitter源码

public interface ObservableEmitter<@NonNull T> extends Emitter<T> {
    //code...
}

ObservableEmitter也是一个接口。它继承了 Emitter<T> 接口
Emitter<T>源码

public interface Emitter<@NonNull T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

定义了 我们在ObservableOnSubscribe中实现subscribe()方法里最常用的三个方法
create()源码方法里就一句话return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
RxJavaPlugins.onAssembly方法源码

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

RxJavaPlugins.onAssembly中返回source,即传入的对象,也就是new ObservableCreate<T>(source)
create需要返回的是Observable,而我现在有的是ObservableOnSubscribe对象,ObservableCreate将ObservableOnSubscribe适配成Observable
至此,创建流程结束,我们得到了Observable<T>对象,其实就是ObservableCreate<T>
(2)subscribe订阅流程分析

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            //code...
            //真正的订阅处
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //创建CreateEmitter,也是一个适配器
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        //onSubscribe()参数是Disposable ,所以CreateEmitter可以将Observer->Disposable 。还有一点要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。
        observer.onSubscribe(parent);

        try {
            //将ObservableOnSubscribe源头与CreateEmitter联系起来
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //code...
            //如果没有被dispose,会调用Observer的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
            }
            //如果没有被dispose,会调用Observer的onError()方法
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose(); //一定会自动dispose()
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            //如果没有被dispose,会调用Observer的onComplete()方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();// //一定会自动dispose()
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }

Observer源码

public interface Observer<@NonNull T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

Observer是一个接口,里面就四个方法,我们在开头的例子中已经全部实现

主要用到的设计模式:

上一篇 下一篇

猜你喜欢

热点阅读