RxJava源码分析(二)基本的数据流分析(有背压)

2018-08-12  本文已影响0人  kakaxicm

引言

上篇文章中,我们了解了RxJava基本的无背压数据流实现原理,本篇我们依然从案例着手,学习有背压下数据流响应实现。何为背压?大多数情况下,上游发射数据的速度大于下游处理数据的速度,背压策略就是控制数据流速,在RxJava中通过设置下游的处理能力实现“响应式拉取”解决背压问题。

样例

下面是同步订阅带背压的样例:

private void testBackPressure() {
        //同步订阅事件,发送一个接收一个,不会出现被观察者发送事件速度 > 观察者接收事件速度的情况。
        // 可是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题
        Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR);//背压策略,下游接收不到数据的时候报MissingBackpressureException异常

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {

                Log.e(TAG, "Flowable:onSubscribe");
                //设置下游接收的事件个数,如果不设置
                //这里只接收两个事件
                //同步订阅的情况下,如果不设置接收能力,也会报背压异常
                s.request(2);
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "Flowable:onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.e(TAG, "Flowable:onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "Flowable:onComplete: ");
            }
        };

        upstream.subscribe(subscriber);
    }

可见写法和无背压的模式基本一致,而里面出现的元素也和无背压一一对应:
1.Flowable:被观察者,对应Observable;
2.FlowableOnSubscribe:中间件,类似于“管道”,用于发射数据,对应ObservableOnSubscribe;

  1. FlowableEmitter:中间件,类似于“泵”,用于真正发射数据,对应ObservableEmitter;
  2. Subscription:数据流管道开关,类似于“阀门”,对应于Disposable,它可以控制下游的处理能力;
  3. Subscriber:被观察者,对应Observer。
    好了找到对应关系,我们猜想数据流的流向也是和无背压基本一致,事实上也是如此,因此本篇不再重点说明流向,而侧重点放到“响应式拉取”的实现上。由于和无背压的流程相似,不详细阐述他们之间的关系,不熟悉的请看上篇博客。下面先大概看看各元素的构成:

Flowable

public abstract class Flowable<T> implements Publisher<T> {
....
}
public interface Publisher<T> {

    /**
     * Request {@link Publisher} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * <p>
     * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
     * signal the error via {@link Subscriber#onError}.
     *
     * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
     */
    public void subscribe(Subscriber<? super T> s);
}

实现了Publisher接口的subscribe方法:

 @BackpressureSupport(BackpressureKind.SPECIAL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Subscriber<? super T> s) {
        ...
        try {
            s = RxJavaPlugins.onSubscribe(this, s);
            ....
            subscribeActual(s);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

依然调用的subscribeActual抽象方法,我们接着看create操作符返回的FlowableCreate,它实现了subscribeActual方法。

FlowableCreate

public final class FlowableCreate<T> extends Flowable<T> {
    //用户传入的FlowableOnSubscribe
    final FlowableOnSubscribe<T> source;
    //背压策略
    final BackpressureStrategy backpressure;
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }
 @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根据背压策略,构造对应的发射器
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        //订阅时的观察方法,在使用的时候,需要在这个方法里面设置下游的处理事件个数
        t.onSubscribe(emitter);
        try {
            //通过emitter发射数据,用户实现
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}

和ObservableCreate类似,封装了FlowableOnSubscribe和背压策略,在subscribeActual中根据背压策略和传入的观察者设置不同的发射器,然后通过source.subscribe(emitter)发射数据,在这方法里,emitter封装了观察者Subscriber,最终调用它的接收数据方法。
下面我们着重看看发射器的结构,它们既然和背压策略紧密联系,那么背压功能必然由它们实现。

FlowableEmitter族

FlowableEmitter接口

先看它们的顶层接口:

public interface FlowableEmitter<T> extends Emitter<T> {
  ...
  long requested();
  ...
}

其他方法和ObservableEmitter方法基本一致,这里多了requested方法,它用于返回当前可以接受事件的个数。另外发射器族还实现了Subscription接口:

public interface Subscription {
        //设置下游接收能力,n为事件个数,多次调用时,容量累加
        public void request(long n);
       //断开管道
        public void cancel();
}

所以发射器拥有设置下游接收容量和断开管道的功能。
接下来我们再看看背压数据流发射器的基类BaseEmitter:

BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;
        //真正的被观察者
        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            if (isCancelled()) {
                return;
            }
            try {
                //观察者onComplete
                actual.onComplete();
            } finally {
                serial.dispose();
            }
        }

        @Override
        public void onError(Throwable e) {
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (isCancelled()) {
                RxJavaPlugins.onError(e);
                return;
            }
            try {
                //观察者onError
                actual.onError(e);
            } finally {
                serial.dispose();
            }
        }

        @Override
        public final void cancel() {
            serial.dispose();
            onUnsubscribed();
        }

        void onUnsubscribed() {
            // default is no-op
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }
        
        //重点:调用层调用这个方法设置处理能力
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                //计数器+n
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable s) {
            serial.update(s);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            //返回当前可以处理的事件个数
            return get();
        }

        @Override
        public final FlowableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }
    }

继承AtomicLong类,说明发射器是原子操作的Long型数,本质是一个计数器。主要覆写了onError、onComplete等方法,其中request方法是设置Long型数,代表当前发射器的事件容量,调用者在接收数据的onSubscribe方法中必须调用这个方法,设置容量。至于其他方法的实现我们再看他的子类实现。

NoOverflowBaseAsyncEmitter

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //取当前剩余容量
            if (get() != 0) {
                 //被观察者onNext方法
                actual.onNext(t);
                //容量-1
                BackpressureHelper.produced(this, 1);
            } else {
                //剩余容量为0,超出处理能力交给子类实现
                onOverflow();
            }
        }
        //溢出的数据再这个方法中处理,不同的背压策略有不同的实现
        abstract void onOverflow();
    }

到目前为止,我们知道了request方法是做+计数,onNext方法是做减计数,在同步订阅方法中通过这两点实现响应式拉取,即:我能吃多少就只能吃多少,超过的事件交给onOverflow处理。接下来我们看看具体的几个发射器实现

发射器的实现

MissingEmitter

MissingEmitter对应MISSING策略:

static final class MissingEmitter<T> extends BaseEmitter<T> {
        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                //当前计数器为0或者安全-1返回,则退出循环
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }

发现它没有继承NoOverflowBaseAsyncEmitter,所以没有做溢出处理,在onNext方法中检查当前计数器的值,安全减1或者计数值为0时返回,可见它接收到事件能处理就处理,处理不了就睁一只鸭闭一只眼直接返回。

ErrorAsyncEmitter

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            //事件溢出处理
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }

ErrorAsyncEmitter在事件溢出时直接抛出背压异常。

DropAsyncEmitter

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 8360058422307496563L;

        DropAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            //事件溢出,就当没看见
        }

    }

到目前为止,我们梳理了背压原理,其实本质就是在发射数据的时候设置了计数器,没接收一个事件计数器减一,背压策略就是处理数据溢出的情况。

上一篇下一篇

猜你喜欢

热点阅读