Observable.mergeDelayError源码分析
-
作用
Combines multiple Observables into one. Any
onError
notifications passed from any of the source observables will be withheld until all merged Observables complete, and only then will be passed along to the observers.把多个Observable实例组合成一个,任何Observable发出的onError事件都被保留到所有的正常Observable完成后再传递给Observer调用。
-
源码分析
-
Example
Observable<String> observable1 = Observable.error(new IllegalArgumentException("")); Observable<String> observable2 = Observable.just("Four", "Five", "Six"); Observable.mergeDelayError(observable1, observable2) .subscribe(System.out::println);
-
解析
mergeDelayError方法:
public static <T> Observable<T> mergeDelayError( @NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) { Objects.requireNonNull(source1, "source1 is null"); Objects.requireNonNull(source2, "source2 is null"); return fromArray(source1, source2).flatMap((Function)Functions.identity(), true, 2); }
flatMap中最后返回一个ObservableFlatMap对象:
public final <R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { Objects.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarSupplier) { @SuppressWarnings("unchecked") T v = ((ScalarSupplier<T>)this).get(); if (v == null) { return empty(); } return ObservableScalarXMap.scalarXMap(v, mapper); } return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize)); }
根据RxJava原理,它的调用链是:
1.ObservableFromArray【含有ObservableError【JustValue【IllegalArgumentException】】、ObservableFromArray【"Four", "Five", "Six"】】
2.ObservableFlatMap【含有1中的ObservableFromArray,还有个特别的变量delayErrors是true】
subscribe调用就是ObservableFlatMap的subscribeActual:
@Override public void subscribeActual(Observer<? super U> t) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { return; } source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize)); }
这里的t是LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer()),可以看到调用了source.subscribe,ObservableFromArray是ObservableFlatMap的source,所以到了ObservableFromArray的subscribeActual里面:
@Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); }
observer就是前面传过来的MergeObserver,同样在主事件执行前先把pre事件onSubscribe处理完,所以先调用了它的onSubscribe:
@Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; downstream.onSubscribe(this); } }
看到把FromArrayDisposable赋值给upstream,然后调用downstream的onSubscribe方法,也就是LambdaObserver的onSubscribe:
@Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); d.dispose(); onError(ex); } } }
这里调用了onSubscribe的accept方法,onSubscribe就是构建时传入的Functions.emptyConsumer(),当然这里什么也没做,如果想要在subscribe主事件执行前做点什么就可以传一个Consumer对象,在它的accept方法里面进行预操作。
接下来就是主流程d.run():
void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } }
可以看到,会循环array依次对内部的Observable执行其onNext方法,最后没有dispose的话就执行onComplete()。
我们顺着downstream往下看看onNext做了什么:
@Override public void onNext(T t) { // safeguard against misbehaving sources if (done) { return; } ObservableSource<? extends U> p; try { p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); return; } if (maxConcurrency != Integer.MAX_VALUE) { synchronized (this) { if (wip == maxConcurrency) { sources.offer(p); return; } wip++; } } subscribeInner(p); }
最后会执行到subscribeInner(p),这里的p也就是t,因为mapper的apply什么也没做:
void subscribeInner(ObservableSource<? extends U> p) { for (;;) { if (p instanceof Supplier) { if (tryEmitScalar(((Supplier<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) { boolean empty = false; synchronized (this) { p = sources.poll(); if (p == null) { wip--; empty = true; } } if (empty) { drain(); break; } } else { break; } } else { InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId++); if (addInner(inner)) { p.subscribe(inner); } break; } } }
这里我们的p(ObservableError和ObservableFromArray)都不是Supplier类型的,所以会走到else,这里构建了一个InnerObserver对象,addInner保存这个inner,之后调用subscribe方法,首先要调用的当然是第一个value—ObservableError,看一下它的subscribeActual():
@Override public void subscribeActual(Observer<? super T> observer) { Throwable error; try { error = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable."); } catch (Throwable t) { Exceptions.throwIfFatal(t); error = t; } EmptyDisposable.error(error, observer); }
最后一行的error方法:
public static void error(Throwable e, Observer<?> observer) { observer.onSubscribe(INSTANCE); observer.onError(e); }
这里调用了observer的onSubscribe方法和onError方法,observer就是前面的InnerObserver,它的onSubscribe方法如下:
@Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { if (d instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<U> qd = (QueueDisposable<U>) d; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { fusionMode = m; queue = qd; done = true; parent.drain(); return; } if (m == QueueDisposable.ASYNC) { fusionMode = m; queue = qd; } } } }
d就是前面的INSTANCE,就是一个EmptyDisposable的枚举实例:
public enum EmptyDisposable implements QueueDisposable<Object> { /** * Since EmptyDisposable implements QueueDisposable and is empty, * don't use it in tests and then signal onNext with it; * use Disposables.empty() instead. */ INSTANCE, /** * An empty disposable that returns false for isDisposed. */ NEVER ;
所以会走到qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY):
@Override public int requestFusion(int mode) { return mode & ASYNC; }
经过计算,m是ASYNC,所以fusionMode是ASYNC,queue是EmptyDisposable。
接下里是onError:
@Override public void onError(Throwable t) { if (parent.errors.tryAddThrowableOrReport(t)) { if (!parent.delayErrors) { parent.disposeAll(); } done = true; parent.drain(); } }
这里的parent.errors.tryAddThrowableOrReport(t)会把IllegalArgumentException保存下来,然后判断delayErrors属性,这里是true,所以不会被dispose,done=true,然后执行parent.drain(),这里是为什么ObservableError在array前面却放在最后才输出的跳过逻辑。parent.drain如下:
void drain() { if (getAndIncrement() == 0) { drainLoop(); } } void drainLoop() { final Observer<? super U> child = this.downstream; int missed = 1; for (;;) { if (checkTerminate()) { return; } int innerCompleted = 0; SimplePlainQueue<U> svq = queue; if (svq != null) { for (;;) { if (checkTerminate()) { return; } U o = svq.poll(); if (o == null) { break; } child.onNext(o); innerCompleted++; } } if (innerCompleted != 0) { if (maxConcurrency != Integer.MAX_VALUE) { subscribeMore(innerCompleted); innerCompleted = 0; } continue; } boolean d = done; svq = queue; InnerObserver<?, ?>[] inner = observers.get(); int n = inner.length; int nSources = 0; if (maxConcurrency != Integer.MAX_VALUE) { synchronized (this) { nSources = sources.size(); } } if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) { errors.tryTerminateConsumer(downstream); return; } if (n != 0) { int j = Math.min(n - 1, lastIndex); sourceLoop: for (int i = 0; i < n; i++) { if (checkTerminate()) { return; } @SuppressWarnings("unchecked") InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j]; SimpleQueue<U> q = is.queue; if (q != null) { for (;;) { U o; try { o = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); is.dispose(); errors.tryAddThrowableOrReport(ex); if (checkTerminate()) { return; } removeInner(is); innerCompleted++; j++; if (j == n) { j = 0; } continue sourceLoop; } if (o == null) { break; } child.onNext(o); if (checkTerminate()) { return; } } } boolean innerDone = is.done; SimpleQueue<U> innerQueue = is.queue; if (innerDone && (innerQueue == null || innerQueue.isEmpty())) { removeInner(is); innerCompleted++; } j++; if (j == n) { j = 0; } } lastIndex = j; } if (innerCompleted != 0) { if (maxConcurrency != Integer.MAX_VALUE) { subscribeMore(innerCompleted); innerCompleted = 0; } continue; } missed = addAndGet(-missed); if (missed == 0) { break; } } }
parent.drain就像名字(排水)指示的那样清空所有的待处理事件,这第一次的drain会从if (o == null) { break;}处跳出循环结束。
第二次就是Observable.just("Four", "Five", "Six")了,因为它也是ObservableFromArray,所以首先会走完它自己的run方法,也就是输出这三个字符串,然后走到downstream的onComplete,也就是MergeObserver的onComplete:
@Override public void onComplete() { if (done) { return; } done = true; drain(); }
这里把done赋值为true,再次执行drain,翻看drainLoop方法,此时会在
if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) { errors.tryTerminateConsumer(downstream); return; }
处跳出循环,这里会调用tryTerminateConsumer:
public void tryTerminateConsumer(Observer<?> consumer) { Throwable ex = terminate(); if (ex == null) { consumer.onComplete(); } else if (ex != ExceptionHelper.TERMINATED) { consumer.onError(ex); } }
这里的consumer就是LambdaObserver,所以他的onError方法是:
@Override public void onError(Throwable t) { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { onError.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(new CompositeException(t, e)); } } else { RxJavaPlugins.onError(t); } }
lazySet(DisposableHelper.DISPOSED)设置成事件被处理状态,因为我们调用subscribe时传入的是一个没有实现onError方法的Consumer,所以onError就是前面构造LambdaObserver时默认的Functions.ON_ERROR_MISSING,也就是:
static final class OnErrorMissingConsumer implements Consumer<Throwable> { @Override public void accept(Throwable error) { RxJavaPlugins.onError(new OnErrorNotImplementedException(error)); } }
所以这里会抛出一个OnErrorNotImplementedException。
打印的结果为:
image-20201020162102365
-