RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战系列文章
RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用
一、示例
1.1 应用场景
今天,我们介绍一种新的场景,轮询操作。也就是说,我们会尝试间隔一段时间就向服务器发起一次请求,在使用RxJava
之前,该需求的实现一般有两种方式:
- 通过
Handler
发送延时消息,在handleMessage
中请求服务器之后,再次发送一个延时消息,直到达到循环次数为止。 - 使用
Java
提供的定时器Timer
。
我们尝试使用RxJava2
提供的操作符来实现这一需求,这里演示两种方式的轮询,并将单次访问的次数限制在5
次:
- 固定时延:使用
intervalRange
操作符,每间隔3s
执行一次任务。 - 变长时延:使用
repeatWhen
操作符实现,第一次执行完任务后,等待4s
再执行第二次任务,在第二次任务执行完成后,等待5s
,依次递增。
2.2 示例
public class PollingActivity extends AppCompatActivity {
private static final String TAG = PollingActivity.class.getSimpleName();
private TextView mTvSimple;
private TextView mTvAdvance;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_polling);
mTvSimple = (TextView) findViewById(R.id.tv_simple);
mTvSimple.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startSimplePolling();
}
});
mTvAdvance = (TextView) findViewById(R.id.tv_advance);
mTvAdvance.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startAdvancePolling();
}
});
mCompositeDisposable = new CompositeDisposable();
}
private void startSimplePolling() {
Log.d(TAG, "startSimplePolling");
Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
doWork(); //这里使用了doOnNext,因此DisposableObserver的onNext要等到该方法执行完才会回调。
}
});
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
private void startAdvancePolling() {
Log.d(TAG, "startAdvancePolling click");
Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {
@Override
public void run() throws Exception {
doWork();
}
}).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {
private long mRepeatCount;
@Override
public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
//必须作出反应,这里是通过flatMap操作符。
return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Object o) throws Exception {
if (++mRepeatCount > 4) {
//return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
}
Log.d(TAG, "startAdvancePolling apply");
return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
}
});
}
});
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
private DisposableObserver<Long> getDisposableObserver() {
return new DisposableObserver<Long>() {
@Override
public void onNext(Long aLong) {}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
}
};
}
private void doWork() {
long workTime = (long) (Math.random() * 500) + 500;
try {
Log.d(TAG, "doWork start, threadId=" + Thread.currentThread().getId());
Thread.sleep(workTime);
Log.d(TAG, "doWork finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}
startSimplePolling
对应于固定时延轮询:

startAdvancePolling
对应于变长时延轮询:
三、示例解析
下面,就让我们一起来分析一下上面这两个例子中涉及到的知识点。
3.1 intervalRange & doOnNext 实现固定时延轮询
对于固定时延轮询的需求,采用的是intervalRange
的方式来实现,它是一个创建型操作符,该Observable
第一次先发射一个特定的数据,之后间隔一段时间再发送一次,它是interval
和range
的结合体,这两个操作符的原理图为:


该操作符的优势在于:
- 与
interval
相比,它可以指定第一个发送数据项的时延、指定发送数据项的个数。 - 与
range
相比,它可以指定两项数据之间发送的时延。
intervalRange
的接收参数的含义为:
-
start
:发送数据的起始值,为Long
型。 -
count
:总共发送多少项数据。 -
initialDelay
:发送第一个数据项时的起始时延。 -
period
:两项数据之间的间隔时间。 -
TimeUnit
:时间单位。
在轮询操作中一般会进行一些耗时的网络请求,因此我们选择在doOnNext
进行处理,它会在下游的onNext
方法被回调之前调用,但是它的运行线程可以通过subscribeOn
指定,下游的运行线程再通过observerOn
切换会主线程,通过打印对应的线程ID
可以验证结果。
当要求的数据项都发送完毕之后,最后会回调onComplete
方法。
3.2 repeatWhen 实现变长时延轮询
3.2.1 使用 repeatWhen 实现重订阅
之所以可以通过repeatWhen
来实现轮询,是因为它为我们提供了重订阅的功能,而重订阅有两点要素:
- 上游告诉我们一次订阅已经完成,这就需要上游回调
onComplete
函数。 - 我们告诉上游是否需要重订阅,通过
repeatWhen
的Function
函数所返回的Observable
确定,如果该Observable
发送了onComplete
或者onError
则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。
其原理图如下所示:

repeatWhen
的难点在于如何定义它的Function
参数:
-
Function
的输入是一个Observable<Object>
,输出是一个泛型ObservableSource<?>
。 - 如果输出的
Observable
发送了onComplete
或者onError
则表示不需要重订阅,结束整个流程;否则触发重订阅的操作。也就是说,它 仅仅是作为一个是否要触发重订阅的通知,onNext
发送的是什么数据并不重要。 -
对于每一次订阅的数据流 Function 函数只会回调一次,并且是在
onComplete
的时候触发,它不会收到任何的onNext
事件。 - 在
Function
函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap
操作符接收上游的数据,对于flatMap
的解释,大家可以参考 RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯 。
而当我们不需要重订阅时,有两种方式:
- 返回
Observable.empty()
,发送onComplete
消息,但是DisposableObserver
并不会回调onComplete
。 - 返回
Observable.error(new Throwable("Polling work finished"))
,DisposableObserver
的onError
会被回调,并接受传过去的错误信息。
3.2.2 使用 Timer 实现两次订阅之间的时延
以上就是对于repeatWhen
的解释,与repeatWhen
相类似的还有retryWhen
操作符,这个我们在下一篇文章中再介绍,接下来,我们看一下如何实现两次事件的时延。
前面我们分析过,重订阅触发的时间是在返回的ObservableSource
发送了onNext
事件之后,那么我们通过该ObservableSource
延迟发送一个事件就可以实现相应的需求,这里使用的是time
操作符,它的原理图如下所示,也就是,在订阅完成后,等待指定的时间它才会发送消息。

3.2.3 使用 doOnComplete 完成轮询的耗时操作
由于在订阅完成时会发送onComplete
消息,那么我们就可以在doOnComplete
中进行轮询所要进行的具体操作,它所运行的线程通过subscribeOn
指定。
更多文章,欢迎访问我的 Android 知识梳理系列:
- Android 知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
- 个人主页:http://lizejun.cn
- 个人知识总结目录:http://lizejun.cn/categories/