RxJava1执行分析1

2018-02-13  本文已影响22人  lyzaijs

版本:RxJava1.3.0
主要分析subscribe()实现方式和原理。(过程在会涉及到操作符等,例如map...) 都知道Rx敲代码很爽快,主要归功于它可以将一个业务可以链式的实现(丰富的操作符)和方便线程调度

这里主要分析链式的实现

关键点: Observable OnSubscribe subscribe subscriber(Observer)
一句话串联,即:观察者(subscriber|observer)订阅(subscribe)消息(被观察者 Observable OnSubscribe )

示例:

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("goods");
                subscriber.onCompleted();
            }
        })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

但是我们常用的一般是Action1并不上面提到的subscriber,原来是已经封装成ActionSubscriber

    public final Subscription subscribe(final Action1<? super T> onNext) {
        ...
         Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }

继续往下找subscribe的最终实现:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     ...
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        ...
        try {
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            ...
            }
            return Subscriptions.unsubscribed();
        }
    }

关键点 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);(这里可以将代码简化为 observable.onSubscribe.call(subscriber))这行对以上所谓的 观察、订阅进行了串联。即你在create(OnSubscribe)中进行的onNext操作的对象就是Action1(即ActionSubscriber)。

有了以上基础,下面再来加入map操作符(这里才是重中之重)

上一篇 下一篇

猜你喜欢

热点阅读