android

Rxjava源码解析--基本用法源码分析

2019-03-26  本文已影响4人  二妹是只猫

观察者(订阅)模式
想了解rxjava就绕不过观察者模式,在观察者(订阅)模式文中对该模式有一个基本的介绍。

创建于使用

Observable observable = Observable.create(new 
        ObservableOnSubscribe<Object>() {
     @Override
      public void subscribe(ObservableEmitter<Object> emitter) 
            throws Exception {
          emitter.onNext("hello");
      }
 });
      Observer observer = new Observer() {
          @Override
          public void onSubscribe(Disposable d) {

          }

          @Override
          public void onNext(Object o) {
              System.out.println("{}"+o);
          }

          @Override
          public void onError(Throwable e) {

          }

          @Override
          public void onComplete() {

          }
      };
  observable.subscribe(observer);

这样一个简单的Rxjava就创建成功并成功订阅了,当运行上方代码时,订阅者Observer的onNext方法会被调用并接收到“hello”

现在来看看观察者、被观察者是如何被创建并关联上的

  @SchedulerSupport(SchedulerSupport.NONE)
  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
      ObjectHelper.requireNonNull(source, "source is null");
      return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
  }
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...
}

create方法返回一个Observable的子类ObservableCreate,给其中ObservableOnSubscribe赋值。注意subscribeActual方法 ,之后调用被观察者订阅观察者的subscribe最终就是执行的这儿。

public interface Observer<T> {

  void onSubscribe(@NonNull Disposable d);

  void onNext(@NonNull T t);

  void onError(@NonNull Throwable e);

  void onComplete();

}

Observer很简单就是一个接口

  public final void subscribe(Observer<? super T> observer) {
      ObjectHelper.requireNonNull(observer, "observer is null");
      try {
          observer = RxJavaPlugins.onSubscribe(this, observer);

          ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

          subscribeActual(observer);
      } catch (NullPointerException e) { // NOPMD
          throw e;
      } catch (Throwable e) {
          Exceptions.throwIfFatal(e);
          // can't call onError because no way to know if a Disposable has been set or not
          // can't call onSubscribe because the call might have set a Subscription already
          RxJavaPlugins.onError(e);

          NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
          npe.initCause(e);
          throw npe;
      }
  }

通过上面subscribeActual(observer)这个抽象方法,调用到Observerble的实现类ObserverblecreatesubscribeActual(observer)(果然如我们上面所说的):

public final class ObservableCreate<T> extends Observable<T> {
  final ObservableOnSubscribe<T> source;

  public ObservableCreate(ObservableOnSubscribe<T> source) {
      this.source = source;
  }

  @Override
  protected void subscribeActual(Observer<? super T> observer) {
      CreateEmitter<T> parent = new CreateEmitter<T>(observer);
      observer.onSubscribe(parent);

      try {
          source.subscribe(parent);
      } catch (Throwable ex) {
          Exceptions.throwIfFatal(ex);
          parent.onError(ex);
      }
  }

  static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {

      private static final long serialVersionUID = -3434801548987643227L;

      final Observer<? super T> observer;

      CreateEmitter(Observer<? super T> observer) {
          this.observer = observer;
      }

      @Override
      public void onNext(T t) {
          if (t == null) {
              onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
              return;
          }
          if (!isDisposed()) {
              observer.onNext(t);
          }
      }

      @Override
      public void onError(Throwable t) {
          if (!tryOnError(t)) {
              RxJavaPlugins.onError(t);
          }
      }
    ...省略代码
}

这里创建了CreateEmitter,它实现了ObservableEmitter接口,这里就将我们刚开始创建的被观察者的subcriibe(ObservableEmitter<Object> emitter)关联了起来。最终我们我们在Observable的subscribe方法中调用的emitter.onNext("hello")就是它实现的:

到这里我们的rxjava示例就完整的运行完毕了,最终就是Observable的实ObservableCreateObserver关联,并通过CreateEmitter发送通知消息,1中调用到observer的onNext(t),这样一个简单的rxjava就实现了。

上一篇 下一篇

猜你喜欢

热点阅读