RxJava

RxJava出错重连

2019-01-10  本文已影响19人  shiyuzhe

retryWhen

  模拟网络请求出错重连
 /**
     * 重连三次
     * 测试:第三次重连时模拟请求成功,前两次发送错误
     */
    fun retry() {
        var num = 0//记录重连次数
        Observable.timer(1, TimeUnit.SECONDS).doOnSubscribe {
            System.out.println("subscribing")
        }.map {
            if (++num > 2)
                return@map 1
            throw  RuntimeException()
        }.retryWhen {
            val counter = AtomicInteger()
            it.takeWhile {
                counter.getAndIncrement() != 3
            }.flatMap {
                System.out.println("delay retry by " + counter.get() + " second(s)")
                Observable.timer(counter.toLong(), TimeUnit.SECONDS)
            }
        }.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribeBy {
                    System.out.println("subscribeBy$it")
                }
    }

解决多次调用retry(),在subscribe之前dispose掉之前的

  private var disposable: Disposable? = null
    /**
     * 重连三次
     * 测试:第三次重连时模拟请求成功,前两次发送错误
     */
    fun retry() {
        var num = 0//记录重连次数
        disposable = Observable.timer(1, TimeUnit.SECONDS)
                .map {
                    if (++num > 2)
                        return@map 1
                    throw  RuntimeException()
                }.retryWhen {
                    val counter = AtomicInteger()
                    it.takeWhile {
                        counter.getAndIncrement() != 3
                    }.flatMap {
                        System.out.println("delay retry by " + counter.get() + " second(s)")
                        Observable.timer(counter.toLong(), TimeUnit.SECONDS)
                    }
                }.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .doOnSubscribe {
                    //retryWhen之后调用一次,之前每次重连都调用
                    disposable?.dispose()
                    System.out.println("subscribing")
                }.subscribeBy {
                    System.out.println("subscribeBy$it")
                }
    }

将重连放到扩展函数中

fun ret() {
        var num = 0//记录重连次数
        disposable = Observable.timer(1, TimeUnit.SECONDS)
                .map {
                    if (++num > 2)
                        return@map 1
                    throw  RuntimeException()
                }.subscribeByThreadRetry(disposable) {
                    System.out.println("subscribeByThreadRetry:$it")
                }
    }
fun <T : Any> Observable<T>.subscribeByThreadRetry(
disposable: Disposable?, 
onErrStub: (Throwable) -> Unit = {}, 
onNextStub: (T) -> Unit = {}): Disposable = this.retryWhen {
    val counter = AtomicInteger()
    it.takeWhile {
        counter.getAndIncrement() != 3
    }.flatMap {
        Observable.timer(counter.toLong(), TimeUnit.SECONDS)
    }
}.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .doOnSubscribe {
            //retryWhen之后调用一次,之前每次重连都调用
            disposable?.dispose()
        }.subscribeBy(onErrStub, {}, onNextStub)

上一篇下一篇

猜你喜欢

热点阅读