RxJava 1.x 源码分析之基本元素

2018-07-30  本文已影响4人  xoxoxiong

使用的版本:
implementation 'io.reactivex:rxjava:1.3.8'
implementation 'io.reactivex:rxandroid:1.2.1'

简单示例

先看一下一个简单的例子如下:

Subscription subscription =  Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("123");
                    subscriber.onNext("345");
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }
        });

通过这个简单的例子我们可以知道RxJava1的基本元素包括

  1. Observable
  2. Observer
  3. Subscription
  4. OnSubscribe
  5. Subscriber

Observable

  1. 被观察者
  2. 通过Observab创建一个可观察的序列(create方法)
  3. 通过subscribe去注册一个观察者
    代码太长就不贴了

Observer接口

  1. 观察者
  2. 作为Observable的subscribe方法参数
public interface Observer<T> {

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onCompleted();

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onCompleted}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(T t);

}

Subscription接口

  1. 订阅,用于描述被观察者和观察者之间的关系
  2. 用于取消订阅和获取当前的订阅状态
public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}

OnSubscribe接口

  1. 当订阅时会出发此接口调用
  2. 在Observable内部,实际作用是向订阅者发射数据
 public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }

public interface Action1<T> extends Action {
    void call(T t);
}

Subscriber类

  1. 实现了Observer和Subscription

方法分析

代码示例中我们可以看到主要是两个方法:

create()方法

首先我们来分析一下create方法,引用源码我们可以了解到这里是创建Observable对象

/**
     * Constructs an Observable in an unsafe manner, that is, unsubscription and backpressure handling
     * is the responsibility of the OnSubscribe implementation.
     * @param <T> the value type emitted
     * @param f the callback to execute for each individual Subscriber that subscribes to the
     *          returned Observable
     * @return the new Observable instance
     * @deprecated 1.2.7 - inherently unsafe, use the other create() methods for basic cases or
     * see {@link #unsafeCreate(OnSubscribe)} for advanced cases (such as custom operators)
     * @see #create(SyncOnSubscribe)
     * @see #create(AsyncOnSubscribe)
     * @see #create(Action1, rx.Emitter.BackpressureMode)
     */
    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
       //返回一个Observable对象,他的onSubscribe就是传递进来的f
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

  /**
     * Creates an Observable with a Function to execute when it is subscribed to.
     * <p>
     * <em>Note:</em> Use {@link #unsafeCreate(OnSubscribe)} to create an Observable, instead of this constructor,
     * unless you specifically have a need for inheritance.
     *
     * @param f
     *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
     */
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }


/**
     * Hook to call when an Observable is created.
     * @param <T> the value type
     * @param onSubscribe the original OnSubscribe logic
     * @return the original or replacement OnSubscribe instance
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        //onObservableCreate这货是在RxJavaHooks中赋值的,其实就是把里面的值返回来了,代码在下面。这里返回的还是onSubscribe
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

 static void initCreate() {
        onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
               //查看下面的代码可知,这里返回的就是传递进来的f
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
            }
        };


       ...
}

/**
     * Invoked during the construction by {@link Observable#unsafeCreate(OnSubscribe)}
     * <p>
     * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
     * logging, metrics and other such things and pass through the function.
     *
     * @param <T> the value type
     * @param f
     *            original {@link OnSubscribe}<{@code T}> to be executed
     * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
     *         returned as a pass through
     */
    @Deprecated
    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }

好了,根据上边的代码我们可以很清楚的了解到create()方法就是创建一个Observable对象,传递一个OnSubscribe对象。

subscribe()方法

这里就跟观察模式中订阅一样,Observer订阅Observable的变化,在OnSubscribe的call方法通知Observer变化了。具体流程看代码:

/**
     * Subscribes to an Observable and provides an Observer that implements functions to handle the items the
     * Observable emits and any error or completion notification it issues.
     * <dl>
     *  <dd><b>Backpressure:</b><dt>
     *  <dd>The operator consumes the source {@code Observable} in an unbounded manner (i.e., no
     *  backpressure is applied to it).</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param observer
     *             the Observer that will handle emissions and notifications from the Observable
     * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before
     *         the Observable has completed
     * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
     */
    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
             //跟👇那个是一样的,如果是Observer就包装成一个Subscriber再调用
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
         //这里就是再包装成一个Subscriber调用,先追踪subscribe方法,后边说ObserverSubscriber对象的事儿。
        return subscribe(new ObserverSubscriber<T>(observer));
    }

 /**
     * Subscribes to an Observable and provides a Subscriber that implements functions to handle the items the
     * Observable emits and any error or completion notification it issues.
     * <p>
     * A typical implementation of {@code subscribe} does the following:
     * <ol>
     * <li>It stores a reference to the Subscriber in a collection object, such as a {@code List<T>} object.</li>
     * <li>It returns a reference to the {@link Subscription} interface. This enables Subscribers to
     * unsubscribe, that is, to stop receiving items and notifications before the Observable completes, which
     * also invokes the Subscriber's {@link Subscriber#onCompleted onCompleted} method.</li>
     * </ol><p>
     * An {@code Observable<T>} instance is responsible for accepting all subscriptions and notifying all
     * Subscribers. Unless the documentation for a particular {@code Observable<T>} implementation indicates
     * otherwise, Subscriber should make no assumptions about the order in which multiple Subscribers will
     * receive their notifications.
     * <p>
     * For more information see the
     * <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation</a>.
     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
     *  behavior.</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param subscriber
     *            the {@link Subscriber} that will handle emissions and notifications from the Observable
     * @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can
     *         unsubscribe from the Observable
     * @throws IllegalStateException
     *             if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function
     * @throws IllegalArgumentException
     *             if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null}
     * @throws OnErrorNotImplementedException
     *             if the {@link Subscriber}'s {@code onError} method is null
     * @throws RuntimeException
     *             if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable}
     * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
     */
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
     //这里就是关键的调用方法了,我们深入了解一下,这个方法中的Observable参数就是,调用subscribe方法的那个。
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            //!!!!!!!重点来了,在这其实调用的就是observable.onSubscribe.call(subscriber)就都联通了
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

/**
     * Hook to call before the child subscriber is subscribed to the OnSubscribe action.
     * @param <T> the value type
     * @param instance the parent Observable instance
     * @param onSubscribe the original OnSubscribe action
     * @return the original or alternative action that will be subscribed to
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

 static void init() {
        onError = new Action1<Throwable>() {
            @Override
            public void call(Throwable e) {
                RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
            }
        };

        onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
            }
        };
}

  @Deprecated
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
//看吧,这里其实还是直接返回了传过来的onSubScribe。就是我们示例中create()方法里面那个。
        return onSubscribe;
    }

现在我们看看ObserverSubscriber类的源码,它其实就是包装一下Observer然后实际调用还是用的Observer。可以查看他的onNext,onError和onCompleted方法中都是调用的Observer的方法。

/**
 * Wraps an Observer and forwards the onXXX method calls to it.
 * @param <T> the value type
 */
public final class ObserverSubscriber<T> extends Subscriber<T> {
    final Observer<? super T> observer;

    public ObserverSubscriber(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        observer.onError(e);
    }

    @Override
    public void onCompleted() {
        observer.onCompleted();
    }
}

上一篇 下一篇

猜你喜欢

热点阅读