RxJava2.0背压与不背压

2017-05-12  本文已影响2442人  CHSmile
RxJava是什么?

一个基于观察者模式(事件流)的异步任务库。可以很简洁地完成一个异步任务,当任务复杂时也能清晰地表达逻辑。GitHub地址。,具体的一些理论可以查看抛物线
这边文章《给 Android 开发者的 RxJava 详解》,很好的入门教程。

基本使用

在RxJava2.0中,把背压和非背压分两种观察者模式。
背压:事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。

1、非背压

 /**
 * 非背压
 * Observable对应Observer
 */
private void createObservable() {
    //被观察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("This");
            e.onNext("is");
            e.onNext("RxJava");
            e.onComplete();
        }
    });
    //观察者
    Observer<String> observer = new Observer<String>() {
        Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(String s) {
            Log.i(TAG, "onNext: " + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            //取消订阅
            if (!disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    };
    observable.subscribe(observer);
}

2、背压

/**
 * 背压(在异步过程中,由于被观察者发射数据过快,而观察者处理数据不及时,
 * 导致内存里堆积了太多数据,从而OOM,可以选择不同的策略处理该问题)
 * Flowable对应subscriber
 */
private void createFlowable() {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            if (!e.isCancelled()) {
                e.onNext("This");
                e.onNext("is");
                e.onNext("RxJava");
                e.onComplete();
            }
        }
        //抛弃策略
    }, BackpressureStrategy.DROP);

    Subscriber<String> subscriber = new Subscriber<String>() {
        Subscription subscription;
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            //请求一个数据
            subscription.request(1);
        }

        @Override
        public void onNext(String s) {
            Log.i(TAG, "onNext: " + s);
            //处理完后,再请求一个数据
            subscription.request(1);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            //取消订阅
            subscription.cancel();
        }
    };
    flowable.subscribe(subscriber);
}
上一篇下一篇

猜你喜欢

热点阅读