RxJava出错重连
2019-01-10 本文已影响19人
shiyuzhe
retryWhen
模拟网络请求出错重连
/**
* 重连三次
* 测试:第三次重连时模拟请求成功,前两次发送错误
*/
fun retry() {
var num = 0//记录重连次数
Observable.timer(1, TimeUnit.SECONDS).doOnSubscribe {
System.out.println("subscribing")
}.map {
if (++num > 2)
return@map 1
throw RuntimeException()
}.retryWhen {
val counter = AtomicInteger()
it.takeWhile {
counter.getAndIncrement() != 3
}.flatMap {
System.out.println("delay retry by " + counter.get() + " second(s)")
Observable.timer(counter.toLong(), TimeUnit.SECONDS)
}
}.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeBy {
System.out.println("subscribeBy$it")
}
}
解决多次调用retry(),在subscribe之前dispose掉之前的
private var disposable: Disposable? = null
/**
* 重连三次
* 测试:第三次重连时模拟请求成功,前两次发送错误
*/
fun retry() {
var num = 0//记录重连次数
disposable = Observable.timer(1, TimeUnit.SECONDS)
.map {
if (++num > 2)
return@map 1
throw RuntimeException()
}.retryWhen {
val counter = AtomicInteger()
it.takeWhile {
counter.getAndIncrement() != 3
}.flatMap {
System.out.println("delay retry by " + counter.get() + " second(s)")
Observable.timer(counter.toLong(), TimeUnit.SECONDS)
}
}.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.doOnSubscribe {
//retryWhen之后调用一次,之前每次重连都调用
disposable?.dispose()
System.out.println("subscribing")
}.subscribeBy {
System.out.println("subscribeBy$it")
}
}
将重连放到扩展函数中
fun ret() {
var num = 0//记录重连次数
disposable = Observable.timer(1, TimeUnit.SECONDS)
.map {
if (++num > 2)
return@map 1
throw RuntimeException()
}.subscribeByThreadRetry(disposable) {
System.out.println("subscribeByThreadRetry:$it")
}
}
fun <T : Any> Observable<T>.subscribeByThreadRetry(
disposable: Disposable?,
onErrStub: (Throwable) -> Unit = {},
onNextStub: (T) -> Unit = {}): Disposable = this.retryWhen {
val counter = AtomicInteger()
it.takeWhile {
counter.getAndIncrement() != 3
}.flatMap {
Observable.timer(counter.toLong(), TimeUnit.SECONDS)
}
}.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.doOnSubscribe {
//retryWhen之后调用一次,之前每次重连都调用
disposable?.dispose()
}.subscribeBy(onErrStub, {}, onNextStub)