android

Rxjava源码解析--map(和一点just)

2019-03-26  本文已影响2人  二妹是只猫

map操作符的简单使用

Observable.just("1")
        .map(new Function<String, Integer>() {
          @Override
          public Integer apply(String s) throws Exception {
              return Integer.valueOf(s);
          }
        })
        .subscribe(new Consumer<Integer>() {
          @Override
          public void accept(Integer s) throws Exception {
            System.out.println(""+s.getClass());
          }
        });

Observable:

  public static <T> Observable<T> just(T item) {
      ObjectHelper.requireNonNull(item, "The item is null");
      return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
  }
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    ...省略代码
    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
}
  public static final class ScalarDisposable<T>extends AtomicInteger
  implements QueueDisposable<T>, Runnable {
      private static final long serialVersionUID = 3880992722410194083L;
      final Observer<? super T> observer;
      final T value;
      static final int START = 0;
      static final int FUSED = 1;
      static final int ON_NEXT = 2;
      static final int ON_COMPLETE = 3;

      public ScalarDisposable(Observer<? super T> observer, T value) {
          this.observer = observer;
          this.value = value;
      }


      @Override
      public void run() {
          if (get() == START && compareAndSet(START, ON_NEXT)) {
              observer.onNext(value);
              if (get() == ON_NEXT) {
                  lazySet(ON_COMPLETE);
                  observer.onComplete();
              }
          }
      }
  }

just创建处观察者Observable的子类ObservableJust,当被观察者调用订阅方法subscribe时,调用ObservableJust的subscribeActual方法,然后会调用ScalarDisposable的run方法,最终会调用观察者Observer.onNext(value)

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.NONE)
  public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
      ObjectHelper.requireNonNull(mapper, "mapper is null");
      return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
  }

创建返回一个Observable的子类ObservableMap,并将Observable和创建的Function注入到其中。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }
      ...省略部分代码
    }
}

将Observerble和Function都注入到了ObserbleMap中。

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
1   public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
2   public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

 3          subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读