Android开发经验谈Android开发Android RxJava

RxJava之背压策略

2019-07-24  本文已影响8人  103style

转载请以链接形式标明出处:
本文出自:103style的博客

本文基于 RxJava 2.x 版本


目录


RxJava背压策略简介

官方介绍

Backpressure is when in an Flowable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down.
背压是在Flowable处理事件流中,某些异步阶段无法足够快地处理这些值,并且需要一种方法来告诉上游生产商减速。

所以RxJava的背压策略(Backpressure)是指处理上述上游流速过快现象的一种策略。 类似 Java中的线程池 中的饱和策略RejectedExecutionHandler


Observable背压导致崩溃的原因

我们先使用 Observable看看是什么情况:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        })
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(integer);
            }
        });
image.png

输出:

I/art: Background partial concurrent mark sweep GC freed 7(224B) AllocSpace objects, 0(0B) LOS objects, 27% free, 43MB/59MB, paused 528us total 106.928ms
I/System.out: 0
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 20% free, 62MB/78MB, paused 1.065ms total 327.346ms
I/System.out: 1
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 16% free, 82MB/98MB, paused 1.345ms total 299.700ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 13% free, 103MB/119MB, paused 1.609ms total 377.432ms
I/System.out: 2
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 120MB/120MB, paused 1.280ms total 105.749ms
I/art: Background partial concurrent mark sweep GC freed 22(640B) AllocSpace objects, 0(0B) LOS objects, 11% free, 126MB/142MB, paused 1.818ms total 679.398ms
I/System.out: 3
I/art: Background partial concurrent mark sweep GC freed 9(288B) AllocSpace objects, 0(0B) LOS objects, 9% free, 148MB/164MB, paused 1.946ms total 555.619ms
I/System.out: 4
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 165MB/165MB, paused 1.253ms total 107.036ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 8% free, 172MB/188MB, paused 2.355ms total 570.029ms
I/art: Background sticky concurrent mark sweep GC freed 28(768B) AllocSpace objects, 0(0B) LOS objects, 0% free, 188MB/188MB, paused 11.474ms total 82.399ms
I/System.out: 5
I/art: Background partial concurrent mark sweep GC freed 23(672B) AllocSpace objects, 0(0B) LOS objects, 7% free, 197MB/213MB, paused 2.355ms total 631.635ms
I/art: Background partial concurrent mark sweep GC freed 22(640B) AllocSpace objects, 0(0B) LOS objects, 6% free, 226MB/242MB, paused 3.091ms total 908.581ms
I/System.out: 6
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 242MB/242MB, paused 1.672ms total 102.676ms
I/art: Waiting for a blocking GC Alloc
I/art: Clamp target GC heap from 267MB to 256MB
I/art: Alloc sticky concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.581ms total 10.336ms
I/art: WaitForGcToComplete blocked for 12.447ms for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Starting a blocking GC Alloc
I/System.out: 9
I/art: Waiting for a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
I/art: Clamp target GC heap from 268MB to 256MB
I/art: Alloc concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.574ms total 818.037ms
I/art: WaitForGcToComplete blocked for 2.539s for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
W/art: Throwing OutOfMemoryError "Failed to allocate a 12 byte allocation with 4109520 free bytes and 3MB until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)"

我们可以从上图中看到,内存在逐步上升,在一定的时间后,到达256M之后会触发GC,最后抛出OutOfMemoryError。因为上游的事件发送太快而下游的消费者消耗的比较慢。

那导致内存暴增的源头是什么呢 ?


我们对上面的代码做一点点修改,注释了observeOn(AndroidSchedulers.mainThread()),会发现内存显示很正常,不会存在上述问题。

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        emitter.onNext(i);
                    }
                }
            })
            .subscribeOn(Schedulers.computation())
//          .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(integer);
                }
            });
注释了observeOn

所以内存暴增的源头就在 observeOn(AndroidSchedulers.mainThread()).

我们来看看 observeOn的源码,通过 RxJava subscribeOn和observeOn源码介绍,我们知道在 ObservableObserveOn.ObserveOnObserveronSubscribe中构建了一个容量默认为128SpscLinkedArrayQueue

queue = new SpscLinkedArrayQueue<T>(bufferSize);

上游每发送一个事件都会通过queue.offer(t)保存到SpscLinkedArrayQueue中。

public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

我们可以写个测试代码来看看,因为生产比消费快的多,相当于一直添加元素,如下:

private void test(){
    SpscLinkedArrayQueue<Integer> queue = new SpscLinkedArrayQueue<>(128);
    for (int i = 0; ; i++) {
        queue.offer(i);
    }
}

运行会发现内存变化和Observable一样迅速暴增。

测试代码内存变化

SpscLinkedArrayQueue的详细介绍后面再说。现在可以大致理解为 一直狂吃,然后最后撑破肚皮,然后裂开


Flowable的用法

我们来看看 Flowable的用法:

Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)

BackpressureStrategy 包含五种模式:MISSINGERRORBUFFERDROPLATEST

下面对这五种BackpressureStrategy 分别介绍其用法以及 发送事件速度 > 接收事件速度 时的处理方式:


五种背压策略源码分析

通知之前 RxJava之create操作符源码解析 的介绍。我们知道Flowable.create(new FlowableOnSubscribe<Object>(){...}, BackpressureStrategy.LATEST) 返回的是一个FlowableCreate对象。

分别对不同的背压策略创建了不同的Emitter.

public final class FlowableCreate<T> extends Flowable<T> {
    //...
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
    //...
}





小结


参考文章


以上

上一篇下一篇

猜你喜欢

热点阅读