RxJava 背压源码解析

2020-06-24  本文已影响0人  你可记得叫安可

例子

Flowable.create<Int>({ emitter ->
        println("观察者可接受事件数量 = ${emitter.requested()}")
        var flag = false
        for (i in 0..499) {
            flag = false
            // 若 requested() == 0 则不发送
            while (emitter.requested() == 0L) {
                if (!flag) {
                    println("不再发送")
                    flag = true
                }
            }
            println("发送了事件 $i,观察者可接受事件数量 = ${emitter.requested()}")
            emitter.onNext(i)
        }
        emitter.onComplete()
    }, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .subscribe(object : Subscriber<Int> {
            override fun onComplete() {
                println("onComplete")
            }

            override fun onSubscribe(s: Subscription) {
                println("onSubscribe")
                subscribption = s
            }

            override fun onNext(t: Int) {
                println("接收到了事件 $t")
            }

            override fun onError(t: Throwable) {
                println("onError $t")
            }
        })

    Thread.sleep(200)
    println("第一次按钮 48")
    subscribption?.request(48)

    Thread.sleep(200)
    println("第二次按钮 48")
    subscribption?.request(48)

    Thread.sleep(200)
    println("第三次按钮 48")
    subscribption?.request(48)

上面例子中做的事情很简单:
数据源一共有 500 个数据要发射,当 emitter.requested() == 0 时就不再发射,背压策略是 BackpressureStrategy.ERROR,发射线程是 Schedulers.io(),接受线程切换到 Schedulers.computation()。外部通过 subscription.request(48) 一共 3 次请求 48 个数据,每次请求都有间隔 200ms,以保证每次请求后,上游数据都能发射完成。
运行结果

观察者可接受事件数量 = 128
发送了事件 0,观察者可接受事件数量 = 128
发送了事件 1,观察者可接受事件数量 = 127
发送了事件 2,观察者可接受事件数量 = 126
...
发送了事件 125,观察者可接受事件数量 = 3
发送了事件 126,观察者可接受事件数量 = 2
发送了事件 127,观察者可接受事件数量 = 1
不再发送
// 200ms 后
第一次按钮 48
接收到了事件 0
接收到了事件 1
...
接收到了事件 45
接收到了事件 46
接收到了事件 47
// 200ms 后
第二次按钮 48
接收到了事件 48
接收到了事件 49
接收到了事件 50
...
接收到了事件 93
接收到了事件 94
接收到了事件 95
发送了事件 128,观察者可接受事件数量 = 96
发送了事件 129,观察者可接受事件数量 = 95
发送了事件 130,观察者可接受事件数量 = 94
...
发送了事件 221,观察者可接受事件数量 = 3
发送了事件 222,观察者可接受事件数量 = 2
发送了事件 223,观察者可接受事件数量 = 1
// 200ms 后
不再发送
第三次按钮 48
接收到了事件 96
接收到了事件 97
接收到了事件 98
接收到了事件 99
...
接收到了事件 141
接收到了事件 142
接收到了事件 143
END

从上面结果我们可以看到现象:

下面我们来看一下如何通过源码解释上面的现象。了解 RxJava 普通操作符源码的同学都知道,Flowable.create() 实际上创建了一个 FlowableCreate 实例。observeOn(Schedulers.computation()) 实际上创建了一个 FlowableObserveOn 实例,并在其中发射消息时切换了线程。

FlowableCreate

有了之前源码分析的经验,我们只用关注 subscribeActual() 方法:

@Override
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;
    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
    }
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
    }
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
    }
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
    }
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
    }
    }
    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

根据创建 Flowable 时所传入的不同策略,创建了不同的 Emitter

MissingEmitter
@Override
public void onNext(T t) {
    if (isCancelled()) {
        return;
    }
    if (t != null) {
        // 将数据发给下游
        downstream.onNext(t);
    } else {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    // 检查自己的计数器
    for (;;) {
        long r = get();
        if (r == 0L || compareAndSet(r, r - 1)) {
            return;
        }
    }
}

如果有数据往下发(开发者主动调用了 onNext 来发射数据),则直接发给下游。并且给自己的计数器 -1,如果计数器本身就为 0 了则不再 -1.Emitter 自己就是一个计数器,它是如何被赋值的呢,我们看看基类 BaseEmitter

abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
    final Subscriber<? super T> downstream;
    BaseEmitter(Subscriber<? super T> downstream) {
        this.downstream = downstream;
    }

    //... 省略了不是背压处理相关的函数:error, complete 等,跟普通的 Observable 差不多
    
    // 设置计数器的值
    @Override
    public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            BackpressureHelper.add(this, n);
            onRequested();
        }
    }
    void onRequested() {
        // default is no-op
    }
    // 获取计数器的值
    @Override
    public final long requested() {
        return get();
    }
}

可以看到 Emitter 自己的计数器的值是靠 request(n) 方法来设置的。

我们之后可以看到,这个 request(n) 方法是其下游 xxxSubscriber.onSubscribe() 方法里来更新上游的。

ErrorAsyncEmitter
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    ErrorAsyncEmitter(Subscriber<? super T> downstream) {
        super(downstream);
    }
    @Override
    void onOverflow() {
        onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
    }
}

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
    NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
        super(downstream);
    }
    @Override
    public final void onNext(T t) {
        if (isCancelled()) {
            return;
        }
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 计数器 != 0 则往下发射数据,并且计数器减 1
        if (get() != 0) {
            downstream.onNext(t);
            BackpressureHelper.produced(this, 1);
        } else {
            // 如果计数器 == 0,上游还要往下发,则调用 onOverflow() 来处理溢出
            onOverflow();
        }
    }
    abstract void onOverflow();
}

ErrorAsyncEmitter 类继承自 NoOverflowBaseAsyncEmitter。不同于上面的 MissingEmitter 是收到数据后直接往下发,然后计数器 -1,ErrorAsyncEmitter 是计数器不为 0 才会往下发射数据,如果计数器为 0 了,则做溢出处理:往下游通知 onError

DropAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    private static final long serialVersionUID = 8360058422307496563L;
    DropAsyncEmitter(Subscriber<? super T> downstream) {
        super(downstream);
    }
    @Override
    void onOverflow() {
        // nothing to do
    }
}

ErrorAsyncEmitter 差不多,只不过溢出时 onOverflow() 不做任何处理。

LatestAsyncEmitter

这个的实现复杂一点,代码就不贴了。这个策略主要是当缓冲区满了的时候,如果上游再往下发射数据,LatestAsyncEmitter 会缓存最近一次发射的数据。当下游再 request 数据时,会将缓存的数据也一起发射下去。

缓冲区

上面的所有 Emitter 我们都只看到了计数器(Emitter 自身就是一个计数器),计数器帮助 Emitter 执行背压策略来发射数据。那么发射的数据是在哪里被缓存的呢?思考一下,数据缓存只有在异步的时候才需要,而在什么时候我们能够明确地知道是异步操作呢?那就是在调用切线程的操作符时。因此我们看看 FlowableObserveOn

public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
    final boolean delayError;
    final int prefetch;
    public FlowableObserveOn(
            Flowable<T> source,
            Scheduler scheduler,
            boolean delayError,
            int prefetch) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        // 缓冲区大小
        this.prefetch = prefetch;
    }
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<T>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            // 走这里,构造 ObserveOnSubscriber 对象
            source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
        }
    }
}
构造缓冲区

上面主要需要关注的就是 prefetch 值。该值是由开发者调用 observeOn 时传入的,默认是 128。然后将这个缓冲区大小传入 ObserveOnSubscriber 的构造函数。我们先看下 ObserveOnSubscriber.onSubscribe() 方法:

@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.validate(this.upstream, s)) {
        this.upstream = s;
        // ... 省去一些代码
        
        queue = new SpscArrayQueue<T>(prefetch);
        downstream.onSubscribe(this);
        s.request(prefetch);
    }
}

可以看到构造了一个 prefetch 大小的 SpscArrayQueue,并且依次向下游调用 onSubscribe(),向上游 request(prefetch)。还记得上一节中,各个 Emitter 的计数器是怎么来的吗?就是下游主动 request 的。因此我们开头的例子中 ErrorAsyncEmitter 的计数器大小是由这个 ObserveOnSubscriber 主动 request(128) 的,,也就是 128 大小,这也符合我们的 log 中的现象。

设置 limit 为 2/3 缓冲区大小

我们再看构造 FlowableObserveOn 时调用的父类 BaseObserveOnSubscriber

BaseObserveOnSubscriber(
        Worker worker,
        boolean delayError,
        int prefetch) {
    this.worker = worker;
    this.delayError = delayError;
    this.prefetch = prefetch;
    this.requested = new AtomicLong();
    // 这里 limit 设置为缓冲区大小的 2/3
    this.limit = prefetch - (prefetch >> 2);
}

可以看到上面设置了一个 limit 为缓冲区 prefetch 的 2/3 大小。参照我们例子中的 log:

缓冲区的消费行为

直接看 onNext() 方法:

@Override
public final void onNext(T t) {
    if (done) {
        return;
    }
    if (!queue.offer(t)) {
        upstream.cancel();
        // 这就是采用 MISSING 策略时,缓冲区满的时候的报错
        error = new MissingBackpressureException("Queue is full?!");
        done = true;
    }
    trySchedule();
}

在这里我们发现了 MISSING 策略的报错信息。根据我们上面的分析,MISSING 策略就是无脑往下游发射数据,到 BaseObserveOnSubscriber.onNext() 这里时,queue.offer(t) 返回 false,表示缓冲区已满,于是向下游发送 error。因此我们知道了,MISSING 的意思其实是 missing 背压策略,而不是 missing 发射的数据
接着调用 trySchedule(),我们来看看:

final void trySchedule() {
    if (getAndIncrement() != 0) {
        return;
    }
    // 这里切线程执行任务
    worker.schedule(this);
}

上面通过 worker 来切换线程,最终执行的是 ObserveOnSubscriber.runAsync()

@Override
void runAsync() {
    int missed = 1;
    final Subscriber<? super T> a = downstream;
    final SimpleQueue<T> q = queue;
    long e = produced;
    for (;;) {
        // 下游观察者通过调用 request 设置的数目。在我们的例子中,就是 subscription 每次 request 的 48 个数据
        long r = requested.get();
        // 死循环,直到发射下游请求的数目个数据
        while (e != r) {
            boolean d = done;
            T v;
            try {
                // 从缓存中取出数据
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                cancelled = true;
                upstream.cancel();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            if (checkTerminated(d, empty, a)) {
                return;
            }
            if (empty) {
                break;
            }
            a.onNext(v);
            e++;
            // 如果已发射数据 == limit(缓冲区的 2/3),则向上游请求 limit 个数据,并且一次性更新 request 数据
            if (e == limit) {
                if (r != Long.MAX_VALUE) {
                    r = requested.addAndGet(-e);
                }
                upstream.request(e);
                e = 0L;
            }
        }
        if (e == r && checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        int w = get();
        if (missed == w) {
            produced = e;
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        } else {
            missed = w;
        }
    }
}

上面的关键逻辑其实很简单,而且也能解释我们例子中 log 的现象:从缓冲区中一直取数据往下发射,直到发射了下游 request 个数据。如果还没满足下游请求,发射数据就已经达到了缓冲区 2/3 个,那么就主动向上游请求 2/3 个数据,以填满缓冲区。

这就解释了为什么例子中下游观察者只有消费了 96 个以上数据,上游才会继续往下发射 96 个数据(注意是发射缓冲区 2/3 个,如果缓冲区空余大于 2/3,那么上游也只会发射 2/3 个)

多个背压策略

如果我们创建多个背压 Flowable 后,再使用 onBackpressureLatest(), onBackpressureDrop 等背压操作符,那么最后的背压策略是以哪个为准呢?通过看源码我们可以知道,每一个背压操作符在 onSubscribe() 方法中,都是固定向上 request(Long.MAX_VALUE),这样就会使上游的背压策略中的计数器变为无限大,而使上游的背压策略失效。因此,背压策略的操作符总是以最后一个为准。

中间的普通操作符有背压吗?

我们看 FlowableMap,它构造的是 MapConditionalSubscriber,它没有像背压操作符一样继承自 AtomicLong,从而变为一个计数器(计数器被用于判断是否执行背压策略),只是简单地将数据做变换后发给下游。因此背压策略的操作在这些普通操作符上是不生效的。

上一篇 下一篇

猜你喜欢

热点阅读