Rxjava源码解析--map(和一点just)
2019-03-26 本文已影响2人
二妹是只猫
map操作符的简单使用
Observable.just("1")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
System.out.println(""+s.getClass());
}
});
- just(快速创建一个观察者Observerble并发送订阅消息):
Observable:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
...省略代码
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
}
public static final class ScalarDisposable<T>extends AtomicInteger
implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
final Observer<? super T> observer;
final T value;
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
just创建处观察者Observable的子类ObservableJust,当被观察者调用订阅方法subscribe时,调用ObservableJust的subscribeActual方法,然后会调用ScalarDisposable的run方法,最终会调用观察者Observer.onNext(value)
- map:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
创建返回一个Observable的子类ObservableMap
,并将Observable和创建的Function注入到其中。
- ObservableMap:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
...省略部分代码
}
}
将Observerble和Function都注入到了ObserbleMap中。
- 通过
subscribe
进行订阅
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
1 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
2 public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
3 subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
-
链式调用的代码中我们发现没有创建观察者,通过阅读源码发现标记
1
的subscribe方法中创建出来的观察者LambdaObserver
,接下来调用标记2
的subscribe,最终在标记3
处调用了ObservableMap
的subscribeActual方法将上面创建出来的观察者LambdaObserver注入并转换成MapObserver。 -
最终在MapObserver中通过onNext调用到apply得到转换过的数据,其中actual.onNext(v)将其输出,actual也就是之前注入的观察者
LambdaObserver
,这样一个简单的map流程完成。