安卓精华教程RxJavaAndroid开发经验谈

RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作

2017-08-29  本文已影响2915人  泽毛

RxJava2 实战系列文章

RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新
RxJava2 实战知识梳理(2) - 计算一段时间内数据的平均值
RxJava2 实战知识梳理(3) - 优化搜索联想功能
RxJava2 实战知识梳理(4) - 结合 Retrofit 请求新闻资讯
RxJava2 实战知识梳理(5) - 简单及进阶的轮询操作
RxJava2 实战知识梳理(6) - 基于错误类型的重试请求
RxJava2 实战知识梳理(7) - 基于 combineLatest 实现的输入表单验证
RxJava2 实战知识梳理(8) - 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程
RxJava2 实战知识梳理(9) - 使用 timer/interval/delay 实现任务调度
RxJava2 实战知识梳理(10) - 屏幕旋转导致 Activity 重建时恢复任务
RxJava2 实战知识梳理(11) - 检测网络状态并自动重试请求
RxJava2 实战知识梳理(12) - 实战讲解 publish & replay & share & refCount & autoConnect
RxJava2 实战知识梳理(13) - 如何使得错误发生时不自动停止订阅关系
RxJava2 实战知识梳理(14) - 在 token 过期时,刷新过期 token 并重新发起请求
RxJava2 实战知识梳理(15) - 实现一个简单的 MVP + RxJava + Retrofit 应用


一、示例

1.1 应用场景

今天,我们介绍一种新的场景,轮询操作。也就是说,我们会尝试间隔一段时间就向服务器发起一次请求,在使用RxJava之前,该需求的实现一般有两种方式:

我们尝试使用RxJava2提供的操作符来实现这一需求,这里演示两种方式的轮询,并将单次访问的次数限制在5次:

2.2 示例

public class PollingActivity extends AppCompatActivity {

    private static final String TAG = PollingActivity.class.getSimpleName();

    private TextView mTvSimple;
    private TextView mTvAdvance;
    private CompositeDisposable mCompositeDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_polling);
        mTvSimple = (TextView) findViewById(R.id.tv_simple);
        mTvSimple.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startSimplePolling();
            }

        });
        mTvAdvance = (TextView) findViewById(R.id.tv_advance);
        mTvAdvance.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startAdvancePolling();
            }

        });
        mCompositeDisposable = new CompositeDisposable();
    }

    private void startSimplePolling() {
        Log.d(TAG, "startSimplePolling");
        Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long aLong) throws Exception {
                doWork(); //这里使用了doOnNext,因此DisposableObserver的onNext要等到该方法执行完才会回调。
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private void startAdvancePolling() {
        Log.d(TAG, "startAdvancePolling click");
        Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {
                doWork();
            }

        }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {

            private long mRepeatCount;

            @Override
            public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
                //必须作出反应,这里是通过flatMap操作符。
                return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {

                    @Override
                    public ObservableSource<Long> apply(Object o) throws Exception {
                        if (++mRepeatCount > 4) {
                            //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
                            return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
                        }
                        Log.d(TAG, "startAdvancePolling apply");
                        return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
                    }

                });
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private DisposableObserver<Long> getDisposableObserver() {

        return new DisposableObserver<Long>() {

            @Override
            public void onNext(Long aLong) {}

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
            }
        };
    }

    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}

startSimplePolling对应于固定时延轮询:


startAdvancePolling对应于变长时延轮询:

三、示例解析

下面,就让我们一起来分析一下上面这两个例子中涉及到的知识点。

3.1 intervalRange & doOnNext 实现固定时延轮询

对于固定时延轮询的需求,采用的是intervalRange的方式来实现,它是一个创建型操作符,该Observable第一次先发射一个特定的数据,之后间隔一段时间再发送一次,它是intervalrange的结合体,这两个操作符的原理图为:

interval 原理图
range 原理图
该操作符的优势在于:

intervalRange的接收参数的含义为:

在轮询操作中一般会进行一些耗时的网络请求,因此我们选择在doOnNext进行处理,它会在下游的onNext方法被回调之前调用,但是它的运行线程可以通过subscribeOn指定,下游的运行线程再通过observerOn切换会主线程,通过打印对应的线程ID可以验证结果。

当要求的数据项都发送完毕之后,最后会回调onComplete方法。

3.2 repeatWhen 实现变长时延轮询

3.2.1 使用 repeatWhen 实现重订阅

之所以可以通过repeatWhen来实现轮询,是因为它为我们提供了重订阅的功能,而重订阅有两点要素:

其原理图如下所示:

repeatWhen 原理图
repeatWhen的难点在于如何定义它的Function参数:

而当我们不需要重订阅时,有两种方式:

3.2.2 使用 Timer 实现两次订阅之间的时延

以上就是对于repeatWhen的解释,与repeatWhen相类似的还有retryWhen操作符,这个我们在下一篇文章中再介绍,接下来,我们看一下如何实现两次事件的时延。

前面我们分析过,重订阅触发的时间是在返回的ObservableSource发送了onNext事件之后,那么我们通过该ObservableSource延迟发送一个事件就可以实现相应的需求,这里使用的是time操作符,它的原理图如下所示,也就是,在订阅完成后,等待指定的时间它才会发送消息。

timer 原理图

3.2.3 使用 doOnComplete 完成轮询的耗时操作

由于在订阅完成时会发送onComplete消息,那么我们就可以在doOnComplete中进行轮询所要进行的具体操作,它所运行的线程通过subscribeOn指定。


更多文章,欢迎访问我的 Android 知识梳理系列:

上一篇下一篇

猜你喜欢

热点阅读