Android Developer

Android Develop—— RxJava2背压问题以及背

2017-09-11  本文已影响283人  So_ProbuING

RxJava2背压问题描述

场景

简单一点说,还是引用以前的那个场景,在zip操作符的两个水管中,如果出现一个水管A发送事件特别快,而另一个水管B发送事件特别慢,那就可能出现发得快的水管A已经发送了大量的事件,而发送慢的水管B才发出一个事件。在这种情况下,zip给我们的每一根水管都弄了一个水缸,用来保存这些事件。水缸的特点是按顺序保存,先进来的事件先取出来,这个特点就是队列,而zip的实现也是采用队列的形式。使用一个长度为128的队列来存储事件。
在这种情况下,随着内存占用的不断升高,就会造成内存的溢出,也就是我们所说的oom

当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件.

当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是我们刚才说的水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了.

解决方案

Flowable

RxJava提供给我们Flowable、Subscriber分别表示生成者和消费者或者是上、下游

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意这句代码
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);
s.request(Long.MAX_VALUE)

Flowable在设计的时候采用了一种新的设计思路就是响应式拉取的方式,来解决上下游流苏不均衡的问题,就是下游会提供处理能力来告知上游,当调用Subscription.request(1)时,就代表下游的处理能力是1个,然后上游就拿出一个来供下游使用。这是完美的解决背压问题的方法
同样的如果下游没有调用request()方法时,上游就会认为下游没有处理问题的能力,既然下游处理不了,上游又无法继续等待,所以就会抛出异常,在异步中,由于水缸的大小是有限制的128所以在存放的事件或者数据超过128后,再继续存数据就会
导致抛出异常。

背压的处理策略

public static void request() {
        mSubscription.request(128);
    }

public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

对于使用操作符直接返回的情况,RxJava给我们提供了对应的方法来指定不同的策略

Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背压策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

FlowableEmitter

FlowableEmitter是个接口,继承Emitter,Emitter里面就是我们的onNext(),onComplete()和onError()三个方法。我们看到FlowableEmitter中有这么一个方法:

long requested()

该方法的作用就是获取当前外部请求的数量,调用该方法就会获取到下游所调用的request(count)返回count书

下游调用request(n) 告诉上游它的处理能力,上游每发送一个next事件之后,requested就减一,注意是next事件,complete和error事件不会消耗requested,当减到0时,则代表下游没有处理能力了,这个时候你如果继续发送事件,会发生什么后果呢?当然是MissingBackpressureException

当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

异步request.jpg
上一篇下一篇

猜你喜欢

热点阅读