RxJava2 线程Flowable的简单使用

2019-07-30  本文已影响0人  Ovadyah

在RxJava2中使用背压非常简单,由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
  @Override
  public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
      emitter.onNext(1);
      emitter.onNext(2);
      emitter.onComplete();
    }
 }, BackpressureStrategy.BUFFER);

而在RxJava2.0 中,Observable 不再支持背压,而是改用Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题而新增的(抽象)类。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。
上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:

BackpressureStrategy.BUFFER
BackpressureStrategy.DROP
BackpressureStrategy.LATEST

onBackpressureBuffer:是不丢弃数据的处理方式。
把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。

onBackpressureDrop和onBackpressureLatest比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别。
onBackpressureDrop :直接丢弃数据,不缓存任何数据;
onBackpressureLatest :则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。可以结合下面两幅图来理解。

如果增加订阅:

Flowable.create(new FlowableOnSubscribe<Integer>() {
  @Override
  public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
      emitter.onNext(1);
      emitter.onNext(2);
      emitter.onComplete();
    }
 }, BackpressureStrategy.BUFFER).subscribe(new FlowableSubscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
            }

            @Override
            public void onNext(Integer value) {
                //可以处理返回的值
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onComplete() {

            }
        });
上一篇下一篇

猜你喜欢

热点阅读