Rxjava Single.zip 抛出Undeliverabl

2019-11-23  本文已影响0人  Xigong

异常信息

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

如何复现

fun main(args: Array<String>) {
    repeat(1000) {

        Single.zip(single(1), single(2),
                BiFunction<Int, Int, String> { t1, t2 ->

                    StringBuilder()
                            .append(t1)
                            .append(t2)
                            .toString()
                })
                .subscribe({
                    println(it)
                }, {
                    it.printStackTrace(System.out)
                })

        Thread.sleep(50)

    }
}

private fun single(value: Int): Single<Int> = Single.just(value).delay(10, TimeUnit.MILLISECONDS)
        .map { throw RuntimeException(it.toString()) }

分析错误发生的原因

代码分析

io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator#innerError

        void innerError(Throwable ex, int index) {
            // 第一次,调用下游的观察者的.onError方法
            if (getAndSet(0) > 0) {
                disposeExcept(index);
                downstream.onError(ex);
            } else {
            // 第二次,就调用RxJavaPlugins分发异常了
                RxJavaPlugins.onError(ex);
            }
        }

怎么解决呢?

简单的方案是把Single换成Observable或者是Flowable

       public void drain() {
            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row;
            final boolean delayError = this.delayError;

            for (;;) {

                for (;;) {
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {
                            boolean d = z.done;
                            T v = z.queue.poll();
                            boolean empty = v == null;

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {
                                os[i] = v;
                            } else {
                                emptyCount++;
                            }
                        } else {
                            if (z.done && !delayError) {
                                Throwable ex = z.error;
                                if (ex != null) {
                                    cancelled = true;
                                    cancel();
                                    a.onError(ex);
                                    return;
                                }
                            }
                        }
                        i++;
                    }

                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                        v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        cancel();
                        a.onError(ex);
                        return;
                    }

                    a.onNext(v);

                    Arrays.fill(os, null);
                }

                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
            if (cancelled) {
                cancel();
                return true;
            }

            if (d) {
                if (delayError) {
                    if (empty) {
                        Throwable e = source.error;
                        cancelled = true;
                        cancel();
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        return true;
                    }
                } else {
                    Throwable e = source.error;
                    if (e != null) {
                        // 发生异常这里会执行
                        cancelled = true;
                        cancel();
                        a.onError(e);
                        return true;
                    } else
                    if (empty) {
                        cancelled = true;
                        cancel();
                        a.onComplete();
                        return true;
                    }
                }
            }

            return false;
        }
    }

产生这个问题的反思?

为什么会产生这个问题呢?

上一篇 下一篇

猜你喜欢

热点阅读