RxJava(十二)--subscribe()订阅解析

2019-03-08  本文已影响0人  azu_test

介绍

只有调用了subscribe()方法,才能将Observable跟Observer关联起来,并触发Observable内处理器的执行。

执行代码

         //初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }
        });
         //初始化观察者Observer,视作结果接收器
        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };
        //订阅
        observable.subscribe(observer);

源码分析

1. 初始化被观察者Observable
  Observable  observable = Observable.create(数据处理器);

接下来会进入Observable#create方法

    Observable#create
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

RxJavaHooks.onCreate(f)此处不做分析了,知道最终返回的还是数据处理器即可

接下来会进入真正的初始化方法

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

由此可知被观察者Observable持有数据处理器对象Observable.OnSubscribe。

2. 初始化结果接受器观察者Observer
        Observer observer = new Observer<String>() {
           ...
        }
3. 订阅
        observable.subscribe(observer);

接下来回进入

    Observable#subscribe
    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }
    Observable#subscribe
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        //通知观察者做准备工作
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        try {
            //获取数据处理器Observable.OnSubscribe,并做数据处理工作
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

上面方法的主要功能代码均已标注说明。RxJavaHooks.onObservableStart()方法是用来获取当前被观察者的数据执行器。然后调用数据处理器的call()方法,此方法就是外部用户自己实现的方法。call()方法传递的参数就是结果接收器观察者Observer。

            数据处理器内的cal()l方法
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }

上面方法中的subscriber就是结果接受器Observer,经过上面执行onNext()``onCompleted()方法会进入对应的Observer的方法内。

接着就会进入结果接收器Observer内方法体内

        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };

最终输出结果

结果: 杨
结果: 月
结束
上一篇 下一篇

猜你喜欢

热点阅读