RxJava2 源码总结

2017-12-31  本文已影响26人  SScience

总感觉跟着源码走过程过段时间又会忘记,又得翻一遍源码,所以在第一次学习源码时,把领悟的关键点记录下来,以后回看只要稍微浏览下源码,就能迅速明白它的思想(也许以后还会有更深刻的理解😂)。

本学习源码基于 RxJava 2.1.7
源码地址:https://github.com/ReactiveX/RxJava

一,事件产生和消费--create()

首先,以最简单的使用方式来说:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(">>>>>>>>", "onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.e(">>>>>>>>", "onNext:" + integer);
    }
    @Override
    public void onError(Throwable e) {
         Log.e(">>>>>>>>", "onError:" + e.toString());
    }
    @Override
    public void onComplete() {
        Log.e(">>>>>>>>", "onComplete");
    }
});
// ObservableCreate#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent); 
    source.subscribe(parent);
    ...
}

二,操作符--map()

Observable.create(...) 
        .map(new Function<Integer, String>() {
             @Override
             public String apply(Integer integer) throws Exception {
                 return String.valueOf(integer);
             }
         })
         .subscribe(···);
// ObservableMap#subscribeActual
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

三,线程调度--subscribeOn()

Observable.create(...) 
        .subscribeOn(Schedulers.io())
        .subscribe(···);
// ObservableSubscribeOn#subscribeActual
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() { 
            source.subscribe(parent);
        }
    }));
}

三,线程调度--observeOn()

Observable.create(...) 
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(···);
// ObservableObserveOn#subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
    ...
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 
}
新年快乐,2018

本人水平有限,如有错误,欢迎批评指出😊

上一篇下一篇

猜你喜欢

热点阅读