Android开发

RxJava学习(六)retry,retryWhen,repea

2018-04-08  本文已影响0人  大虾啊啊啊

1.retry
当出现错误事件,即当被观察者发送错误事件时。被观察者重新订阅并发送事件。并且是无条件重新订阅。


 //当出现错误的时候,无条件重新发送
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new NullPointerException());
                e.onNext(3);
            }
        })
    //3 重新订阅发送的次数
    .retry(3,new Predicate<Throwable>() {
            @Override
            public boolean test(Throwable throwable) throws Exception {
              //返回true表示重新订阅发送事件
                return true;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: "+integer );
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

2.retryWhen
当出现错误事件,即当被观察者发送错误事件时。被观察者重新订阅并发送事件。并且是有条件重新订阅。
条件为:
1.当apply方法返回的是Observable.empty(),Observable.error()。
则不发送事件。
2.throwableObservable.delay(1,TimeUnit.SECONDS);为重新订阅发送事件的条件,
每隔1秒重新订阅发送事件

Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onError(new NullPointerException());
            e.onNext(3);
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
            return Observable.empty();
          //  return throwableObservable.delay(1,TimeUnit.SECONDS);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: "+integer );
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

3.repeat
无条件重新订阅发送事件

//无条件重新订阅
    Observable.just(2).repeat().subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: "+integer );
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

4.repeatWhen
有条件重新订阅发送事件
条件和retryWhen原理一样

   //有条件重新订阅
    Observable.just(2).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
            //传递被观察者的事件
            //制定轮询时间,每一秒钟轮询一次
            //当发送error或者empty时间,轮询被终止
           // Observable.empty();
          //  Observable.error(new NullPointerException());
            return objectObservable.delay(1,TimeUnit.SECONDS);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }
        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: "+integer );
        }
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " );
        }
        @Override
        public void onComplete() {

        }
    });
上一篇 下一篇

猜你喜欢

热点阅读