Android-RxJavaRxJavaAndroid技术知识

RxJava Error Handling Operators

2016-10-04  本文已影响273人  lluo2010

RxJava Error Handling Operators

简介

RxJava提供了一些操作函数去处理错误相关的情景,使用这些函数可以:

相关处理函数

处理错误相关的操作函数包括:

onErrorReturn

    public final Observable<T> onErrorReturn(Func1<java.lang.Throwable,? extends T> resumeFunction)

Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

一般出错时,会调用onError,然后结束.使用onErrorReturn,onError不会调用,而是发送onErrorReturn里面提供的值,然后onCompleted会被调用. 注意,紧随其后onCompleted会调用.

例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>(){
            @Override
            public void call(Subscriber<? super String> arg0) {
                arg0.onNext("a1");
                arg0.onNext("a2");
                arg0.onError(new Exception("testErrorReturn mock error"));
            }

        });

        observable.onErrorReturn(new Func1<Throwable, String>() {

            @Override
            public String call(Throwable arg0) {
                Utils.println("onErrorReturn call");
                return "aError";
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                Utils.println("onCompleted");
            }

            @Override
            public void onError(Throwable arg0) {
                Utils.println("onError");
            }

            @Override
            public void onNext(String arg0) {
                Utils.println("onResult:"+arg0);
            }
        });
输出:
    onResult:a1
    onResult:a2
    onErrorReturn call
    onResult:aError
    onCompleted

onErrorResumeNext

声明:

    public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {
    public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction);

如果原Observable发生错误时,不调用onError,而是使用一个新的Observable,调用它的call函数(其实就是订阅它),里面onNext的值回传到subscriber,如果有调onCompleted,则subscriber会收到,如果没有则不会收到. 即后续的行为由新的Observable决定了.

上面两个函数的区别是第一个始终提供一个Observable,而第二个可以根据不同的Throwable提供不同的Observable.

例子:

        Observable<String> observable = Observable.create(new OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> arg0) {
                arg0.onNext("value1");
                arg0.onNext("value2");
                arg0.onNext("value3");
                arg0.onError(new Exception("mock testErrorResumeNext exception"));
                arg0.onNext("value4");
            }
        });
        Observable<String> resumeObservable = Observable.create(new OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> arg0) {
                arg0.onNext("valueMock1");
                arg0.onNext("valueMock2");
            }
        });

        observable.onErrorResumeNext(resumeObservable).subscribe(getSubscriber());
输出:
    onResult:value1
    onResult:value2
    onResult:value3
    onResult:valueMock1
    onResult:valueMock2

如果resumeObservable调用onCompleted,则onCompleted会被调用.

onExceptionResumeNext

    public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence);

Instructs an Observable to pass control to another Observable rather than invoking Observer#onError if it encounters an Exception.

*** 和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception. ***

retry

if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error.

如果Observable.onError调用,即发射错误,会重新订阅原来的Obserable,即重新触发Observable,也就是从新调它的call函数.这种方式数据会重复.

  1. public final Observable<T> retry():
    无限重试,直到成功.
  2. public final Observable<T> retry(final long count):
    指定次数重试,超过指定次数还没成功,则subscriber.onError会被调用.
  3. public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate):
    传入一个Fun2,里面有两个入参,一个表示目前参数的次数,另一个就是Observable.onError里的类型,返回值true表示继续重试,false表示不重试,Subscriber.onError会被调用.
    也就是说只要Observable.onError被调用后,会调用Func2去判断是否要重试,Integer表示询问重试的次数,从1开始.

retry(count)例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> arg0) {
                arg0.onNext("value1");
                arg0.onNext("value2");
                arg0.onNext("value3");
                arg0.onError(new Exception("mock testRetryCount exception"));
                arg0.onNext("value4");
            }
        });

        observable.retry(1).subscribe(getSubscriber());
输出:
    onResult:value1
    onResult:value2
    onResult:value3
    onResult:value1
    onResult:value2
    onResult:value3
    onError

从上面可以看出,重试了一次,后面不重试,Subscriber.onError被调用.另外由于重试,数据重复了.

retry(Func2)例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> arg0) {
                arg0.onNext("value1");
                arg0.onNext("value2");
                arg0.onNext("value3");
                arg0.onError(new Exception("mock testRetryFunc2 exception"));
                arg0.onNext("value4");
            }
        });

        observable.retry(new Func2<Integer, Throwable,     Boolean>() {
            @Override
            public Boolean call(Integer arg0, Throwable arg1) {
                Utils.println("testRetryFunc2 func2::call time:"+arg0.intValue());
                return (arg0.intValue()>1)?false:true;
            }
        })
        .subscribe(getSubscriber());
输出:
    onResult:value1
    onResult:value2
    onResult:value3
    testRetryFunc2 func2::call time:1
    onResult:value1
    onResult:value2
    onResult:value3
    testRetryFunc2 func2::call time:2
    onError

Integer从1开始,第一次重试,第二次返回false所以就不重试了,直接onError了.

retryWhen

    public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);
    public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler);

if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

retryWhen的输入参数是Func1,接受一个输入Observable<Throwable>, 返回输出参数Observable<?>。

返回的Observable<?>所要发送的事件决定了重试是否会发生:

这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

*** 可以理解为如果原Observable.onError发生, 如果Func1返回的Observable<?>的onNext被调用,则重新订阅原来的Observable重试,否则不重试.***

输入的Observable(即Func1里Observable<?extends Throwable>)必须作为输出Observable(即Func1的返回值)的源。我们必须对Observable<Throwable>做出反应,然后基于它发送事件;不能只返回一个通用泛型流。

原Observable每次一调用onError(Throwable),Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用我们都需要决定是否需要重订阅。

下面的例子会重试:

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {
                return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                  @Override public Observable<?> call(Throwable error) {
                    return Observable.timer(5, TimeUnit.SECONDS);
                  }
                });
              }
            })

总结:

  1. onErrorReturn,吞掉错误,提供一个默认的返回值.
  2. onErrorResumeNext,吞掉错误,提供一个默认的Observable继续发送数据.
  3. onExceptionResumeNext,和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception.
  4. retry,吞掉错误,按条件重新订阅重试
  5. retryWhen,吞掉错误,由另外一个提供的Observable的行为决定是否重试,如果onNext被调用,则重新订阅重试,onError或者onComplete被调用,则不重试.

RxJava特有的异常

...

...

Reference

  1. Error Handling Operators
  2. RxJava错误处理
  3. RxJava操作符(五)Error Handling
  4. RxJava 教程第三部分:驯服数据流之 高级错误处理
上一篇 下一篇

猜你喜欢

热点阅读