再忆RxJava---背压策略

2019-08-25  本文已影响0人  勇敢地追

1 背压存在的背景

被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM

2 背压策略的原理

3 背压具体情况讨论

3.1 同步策略

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) {
                Log.e("emitter", "发送1");
                emitter.onNext("111");
                Log.e("emitter", "发送2");
                emitter.onNext("222");
                Log.e("emitter", "发送3");
                emitter.onNext("333");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                Log.e("emitter", "接受" + s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e("onError", t.getLocalizedMessage());
            }

            @Override
            public void onComplete() {

            }
        });

其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个
总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的

3.2 异步

先来看几段代码
FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法

        public final void onNext(T t) {
            。。。。。。
            if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走
(可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值)
再来看BaseObserveOnSubscriber的onNext方法

        @Override
        public final void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode == ASYNC) {
                trySchedule();
                return;
            }
            if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
                //也就是最上面get为什么是128的原因
                //此时还没到Handler,所以还是子线程
                upstream.cancel();
                error = new MissingBackpressureException("Queue is full?!");
                done = true;
            }
            trySchedule();
        }

接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext
runAsync主要注意produced和requested.get()

总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)

3.2.1 控制被观察者发送事件的速度---反馈控制

由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力
可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管

3.2.2 控制观察者接收事件的速度---响应式拉取

比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等

参考文献
https://www.jianshu.com/p/ceb48ed8719d

上一篇 下一篇

猜你喜欢

热点阅读