Android-RxJavaAndroid-Rxjava&retrofit&dagger

理解RxJava(一):基本流程源码分析

2017-11-29  本文已影响39人  却把清梅嗅

最近一直没有机会,好好写博客,可能还是太浮躁了,自己对自己的这种状态也不是特别满意。近几日准备安下心来,好好研究一下RxJava,把这期间的所得总结成一个系列,尽量都写博客中,看看这个阶段结束时自己能达到怎么样的程度。

概述

在日常的Android项目开发中,RxJava+Retrofit是一个万金油网络请求框架,通常情况下,我们的代码大概是这样的:

 public void requestUserInfo(String userName) {
    serviceManager.getUserInfoService()
                  .getUserInfo(userName)
                  .subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(info -> {
                      //do something
                  });
 }

事实上,我很少去关心RxJava内部是如何实现的,因为这样我也能够轻松定制项目中的网络请求需求,并且写出看起来还不错的代码。

但是很快我就事与愿违了,因为在测试RxJava相关的业务代码时,面对需要测试的代码块,我束手无策,我接连踩到了很多坑。虽然最后我都通过万能的google一一解决了,但是我已经不满足现状,我可以轻易提出很多个让我疑惑的问题:

1 RxJavaPlugin是干什么的
2 链式调用中当存在数个Observable.subscribeOn()时的处理方式
3 链式调用中当存在数个Observable.observeOn()时的处理方式
4 数据是如何经过操作符进行的处理
5 各个doOnXXX()方法回调所在的线程?

等等......

于是我就有了这个念头:研究源码,彻底将它整明白。

源码版本

RxJava官方Github: https://github.com/ReactiveX/RxJava

Branch:2.x (RxJava2)

版本号:2.1.3

一.基本流程

我决定从简单的代码入手分析RxJava2:

public class RxJavaTest {

    @Test
    public void test() throws Exception {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
             }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(Integer integer) {
                System.out.println(String.valueOf(integer));
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }
}

代码很简单,我们可以简单分成两步:

1.Observable.create()
2.Observable.subscribe()

一步步进行分析。

二.Observable.create()

我们点击进入Observable.create()方法的内部:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //...(表示排除非关键代码,下同)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

可以看到,create方法的返回值仍然是一个Observable,这是当然的,我们看到RxJavaPlugins.onAssembly():


/**
 * Calls the associated hook function.
 */
 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    //...
 }

这是一个hook方法,我们暂时先忽略它,默认这个方法就是返回了直接返回source,即传入的参数对象,那么实际上,create方法的关键就是下面这行代码:

 return new ObservableCreate<T>(source);

来看看这个ObservableCreate是何方神圣:


//1.ObservableCreate是Observable的子类
public final class ObservableCreate<T> extends Observable<T> {

    final ObservableOnSubscribe<T> source;
    
    //2.注意这里,实际上Observable.create方法执行了下面的代码
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    //3.subscribeActual方法是Observable.subscribe()后才执行的
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

我已经把重点标记了,可以得知:

现在关于ObservableCreate这个类本身我们基本都了解了,我们来看看我们传进来的ObservableOnSubscribe是什么:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

原来ObservableOnSubscribe只是一个接口,它就是我们传进来的这个:


Observable.create(new ObservableOnSubscribe<String>() {
            //就是它
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
             }
        }).subscribe(...);

实际上,ObservableOnSubscribe就是让我们去实现的接口,我们将要发射的事件都通过ObservableEmitter发射器对象进行发射(本文中是依次发射Integer 1、2),可以说,在本文的案例中,ObservableOnSubscribe就是数据流的源头。

整理思路

现在我们已知的情报,在Observable调用create()方法时:

现在基本已经清楚了,但是我们还有两个问题需要解答:

1.ObservableEmitter发射器对象何时实例化并发射数据?
2.接下来如何才能实现Observable的订阅?

三.Observable.subscribe()

我们点击进入查看Observable.subscribe()源码:

public final void subscribe(Observer<? super T> observer) {
        //...
        try {
            //...
            //1.我们只需要关注下面这行代码
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) { 
            //...
        }
    }

抛开非核心代码和hook相关代码,我们可以看到,当我们调用 Observable.subscribe(observer)方法时,实际上就是调用Observable.subscribeActual(observer)方法。

应该清楚的记得,在本文中,此时执行subscribe()方法的Observable,实际上应该就是ObservableCreate对象。

因此,Observable.subscribe(observer),实际上执行的是下面的代码:

public final class ObservableCreate<T> extends Observable<T> {
    
    final ObservableOnSubscribe<T> source;
    
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    //真正执行的代码
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1.请注意,参数中的observer实际上就是我们要打印onNext的observer
        //2.实例化CreateEmitter对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        //3.执行observer的onSubscribe方法,这里可以看到:
        //  onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程
        //  和subscribeOn()/observeOn()无关!
        observer.onSubscribe(parent);

        try {
            //4.ObservableOnSubscribe执行subscribe()方法
            //这个方法就是我们在create()中定义的代码
            //其实就是发射数据源,parent是CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
        
            //5.异常处理
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

整理思路

我把需要解释的基本都放到了注释中,简单总结一下流程:

public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
}

现在的问题是,仅仅通过发射器ObservableEmitter.onNext(1),我们需要的打印代码哪里去了?

Observable.create(//...)
        .subscribe(new Observer<Integer>() {
        
            //...
            
            @Override
            public void onNext(Integer integer) {
                System.out.println(String.valueOf(integer));
            }
            
            //...
        
            
        });

我们拐回到这行代码:

protected void subscribeActual(Observer<? super T> observer) {
    //...
    
    //2.实例化CreateEmitter对象
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);

    //...
}

我们来看看CreateEmitter这个类:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    
    // 1.请注意,CreateEmitter实现了Disposable接口
    // 这也就说明了在执行上面subscribeActual()步骤3 observer.onSubscribe()的回调时,可以将ObservableEmitter作为参数传入了
    // ObservableEmitter也是Disposable
    
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;
        
        //2.初始化时,会将打印数据的回调Observer作为成员储存
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //3.在ObservableEmitter.onNext()发射数据时,实际上调用了observer的onNext()回调打印数据
                observer.onNext(t);
            }
        }
        
        // 4(额外):当执行onError()时,会执行dispose()方法取消订阅
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
        
        // 5(额外):当执行onError()时,会执行dispose()方法取消订阅
        //这也就说明为什么 onComplete()和onError()方法只能触发其中的一个了
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

基本都写到了注释中,应该不难理解。

总结

分析一下各个类的职责:

简单总结一下:

1.Observable.create(),实例化ObservableCreate和ObservableOnSubscribe,并存储数据源发射行为,准备发射(我已经准备好数据源,等待被订阅)
2.Observable.subscribe(),实例化ObservableEmitter(发射器ObservableEmitter准备好!数据发射后,数据处理方式Observer已准备好!)
3.执行Observer.onSubscribe()回调,ObservableEmitter作为Disposable参数传入
4.执行ObservableOnSubscribe.subscribe()方法 (ObservableEmitter发射数据,ObservableEmitter内部的Observer处理数据)

此外我们还可以得到的:

5.CreateEmitter 中,只有Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法。

6.Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。

7.onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。

参考

1.RxJava2 源码解析(一)@张旭童
http://www.jianshu.com/p/23c38a4ed360

2.RxJava2 源码解析——流程 @Robin_Lrange
http://www.jianshu.com/p/e5be2fa8701c

上一篇下一篇

猜你喜欢

热点阅读