阿里开源技术spring webflux

响应式编程——SubmissionPublisher

2019-07-30  本文已影响0人  王侦

1.官方文档

A Flow.Publisher that asynchronously issues submitted (non-null) 
items to current subscribers until it is closed. Each current 
subscriber receives newly submitted items in the same order 
unless drops or exceptions are encountered. Using a 
SubmissionPublisher allows item generators to act as compliant 
reactive-streams Publishers relying on drop handling and/or 
blocking for flow control.

A SubmissionPublisher uses the Executor supplied in its 
constructor for delivery to subscribers. The best choice of Executor 
depends on expected usage. If the generator(s) of submitted items 
run in separate threads, and the number of subscribers can be 
estimated, consider using a Executors.newFixedThreadPool(int). 
Otherwise consider using the default, normally the 
ForkJoinPool.commonPool().

Buffering allows producers and consumers to transiently operate 
at different rates. Each subscriber uses an independent buffer. 
Buffers are created upon first use and expanded as needed up to 
the given maximum. (The enforced capacity may be rounded up to 
the nearest power of two and/or bounded by the largest value 
supported by this implementation.) Invocations of request do not 
directly result in buffer expansion, but risk saturation if unfilled 
requests exceed the maximum capacity. The default value of 
Flow.defaultBufferSize() may provide a useful starting point for 
choosing a capacity based on expected rates, resources, and 
usages.

Publication methods support different policies about what to do 
when buffers are saturated. Method submit blocks until resources 
are available. This is simplest, but least responsive. The offer 
methods may drop items (either immediately or with bounded 
timeout), but provide an opportunity to interpose a handler and 
then retry.

If any Subscriber method throws an exception, its subscription is 
cancelled. If a handler is supplied as a constructor argument, it is 
invoked before cancellation upon an exception in method onNext, 
but exceptions in methods onSubscribe, onError and onComplete 
are not recorded or handled before cancellation. If the supplied 
Executor throws RejectedExecutionException (or any other 
RuntimeException or Error) when attempting to execute a task, or 
a drop handler throws an exception when processing a dropped 
item, then the exception is rethrown. In these cases, not all 
subscribers will have been issued the published item. It is usually 
good practice to closeExceptionally in these cases.

Method consume(Consumer) simplifies support for a common 
case in which the only action of a subscriber is to request and 
process all items using a supplied function.

This class may also serve as a convenient base for subclasses 
that generate items, and use the methods in this class to publish 
them. For example here is a class that periodically publishes the 
items generated from a supplier. (In practice you might add 
methods to independently start and stop generation, to share 
Executors among publishers, and so on, or use a 
SubmissionPublisher as a component rather than a superclass.)

一个Flow.Publisher,它将提交的(非空)元素异步发送给当前订阅者,直到它被关闭。除非遇到丢弃或异常,否则每个当前订户以相同的顺序接收新提交的项目。使用SubmissionPublisher允许元素生成器充当兼容的reactive-streams Publishers依赖于丢弃处理和/或阻塞流控制。

SubmissionPublisher使用其构造函数中提供的Executor传递给订阅者。 Executor的最佳选择取决于预期的用途。如果提交元素的生成器在独立的线程中运行,并且可以估计subscribers的数量,请考虑使用Executors.newFixedThreadPool(int)。否则考虑使用默认值,通常是ForkJoinPool.commonPool()。

缓冲允许生产者和消费者以不同的速率进行瞬时操作。每个subscriber使用独立的缓冲区。缓冲区在首次使用时创建,并根据需要扩展到给定的最大值。 (强制容量可以四舍五入到最接近的2的幂和/或由此实现支持的最大值限制。)调用request不直接导致缓冲区扩展,但是如果未填充的requests超过最大容量则有风险饱和。 Flow.defaultBufferSize()的默认值可以为根据预期的速率、资源和用法选择容量提供有用的起点。

Publication方法支持有关缓冲区饱和时要执行的操作的不同策略。方法submit阻塞直到资源可用。这是最简单的,但responsive最不好。 offer方法可能会丢弃项目(立即或有限超时),但提供插入处理器然后重试的机会。

如果任何Subscriber方法抛出异常,则其订阅将被取消。如果提供了处理器作为构造函数参数,则在取消onNext方法中的异常之前调用它,但在取消之前不会记录或处理onSubscribe、onError和onComplete方法中的异常。如果提供的Executor在尝试执行任务时抛出RejectedExecutionException(或任何其他RuntimeException或Error),或者drop处理器在处理已删除的元素时抛出异常,则会重新抛出异常。在这些情况下,并非所有订阅者都已发出已发布的项目。在这些情况下,通常closeExceptionally。

方法consume(Consumer)简化了对常见情况的支持,其中订阅者的唯一操作是使用supplied的函数请求和处理所有项目。

此类还可以作为生成元素的子类的基类,并使用此类中的方法发布它们。例如,这是一个周期性发布supplier生成的元素的类。 (实际上,您可以添加独立启动和停止生成的方法,在发布者之间共享Executor等,或者使用SubmissionPublisher作为组件而不是超类。)

 class PeriodicPublisher<T> extends SubmissionPublisher<T> {
   final ScheduledFuture<?> periodicTask;
   final ScheduledExecutorService scheduler;
   PeriodicPublisher(Executor executor, int maxBufferCapacity,
                     Supplier<? extends T> supplier,
                     long period, TimeUnit unit) {
     super(executor, maxBufferCapacity);
     scheduler = new ScheduledThreadPoolExecutor(1);
     periodicTask = scheduler.scheduleAtFixedRate(
       () -> submit(supplier.get()), 0, period, unit);
   }
   public void close() {
     periodicTask.cancel(false);
     scheduler.shutdown();
     super.close();
   }
 }
Here is an example of a Flow.Processor implementation. It uses 
single-step requests to its publisher for simplicity of illustration. A 
more adaptive version could monitor flow using the lag estimate 
returned from submit, along with other utility methods.

以下是Flow.Processor实现的示例。 它使用单步请求(向publisher),以简化说明。 更自适应的版本可以使用提交返回的滞后估计以及其他实用方法来监控流量。

 class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     (this.subscription = subscription).request(1);
   }
   public void onNext(S item) {
     subscription.request(1);
     submit(function.apply(item));
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }

2.BufferedSubscription

A resizable array-based ring buffer with integrated control to
start a consumer task whenever items are available.  The buffer
algorithm is specialized for the case of at most one concurrent
producer and consumer, and power of two buffer sizes. It relies
primarily on atomic operations (CAS or getAndSet) at the next
array slot to put or take an element, at the "tail" and "head"
indices written only by the producer and consumer respectively.

We ensure internally that there is at most one active consumer
task at any given time. The publisher guarantees a single
producer via its lock. Sync among producers and consumers
relies on volatile fields "ctl", "demand", and "waiting" (along
with element access). Other variables are accessed in plain
mode, relying on outer ordering and exclusion, and/or enclosing
them within other volatile accesses. Some atomic operations are
avoided by tracking single threaded ownership by producers (in
the style of biased locking).

Execution control and protocol state are managed using field
"ctl".  Methods to subscribe, close, request, and cancel set
ctl bits (mostly using atomic boolean method getAndBitwiseOr),
and ensure that a task is running. (The corresponding consumer
side actions are in method consume.)  To avoid starting a new
task on each action, ctl also includes a keep-alive bit
(ACTIVE) that is refreshed if needed on producer actions.
(Maintaining agreement about keep-alives requires most atomic
updates to be full SC/Volatile strength, which is still much
cheaper than using one task per item.)  Error signals
additionally null out items and/or fields to reduce termination
latency.  The cancel() method is supported by treating as ERROR
but suppressing onError signal.

Support for blocking also exploits the fact that there is only
one possible waiter. ManagedBlocker-compatible control fields
are placed in this class itself rather than in wait-nodes.
Blocking control relies on the "waiting" and "waiter"
fields. Producers set them before trying to block. Signalling
unparks and clears fields. If the producer and/or consumer are
using a ForkJoinPool, the producer attempts to help run
consumer tasks via ForkJoinPool.helpAsyncBlocker before
blocking.

Usages of this class may encounter any of several forms of
memory contention. We try to ameliorate across them without
unduly impacting footprints in low-contention usages where it
isn't needed. Buffer arrays start out small and grow only as
needed.  The class uses @Contended and heuristic field
declaration ordering to reduce false-sharing memory contention
across instances of BufferedSubscription (as in, multiple
subscribers per publisher).  We additionally segregate some
fields that would otherwise nearly always encounter cache line
contention among producers and consumers. To reduce contention
across time (vs space), consumers only periodically update
other fields (see method takeItems), at the expense of possibly
staler reporting of lags and demand (bounded at 12.5% == 1/8
capacity) and possibly more atomic operations.

Other forms of imbalance and slowdowns can occur during startup
when producer and consumer methods are compiled and/or memory
is allocated at different rates.  This is ameliorated by
artificially subdividing some consumer methods, including
isolation of all subscriber callbacks.  This code also includes
typical power-of-two array screening idioms to avoid compilers
generating traps, along with the usual SSA-based inline
assignment coding style. Also, all methods and fields have
default visibility to simplify usage by callers.

可调整大小的基于数组的环形缓冲区,具有集成控制功能:可在元素可用时启动消费者任务。缓冲区算法专门用于最多一个并发生产者和消费者的情况,缓冲区大小为2的幂。它主要依赖于next数组槽的原子操作(CAS或getAndSet)来放置或取一个元素,分别只由生产者和消费者更新的“tail”和“head”索引。

我们在内部确保在任何给定时间最多只有一个活跃的消费者任务。publisher通过其锁定保证只有单个生产者。生产者和消费者之间的Sync依赖于volatile字段“ctl”、“demand”和“waiting”(以及元素访问)。其他变量以普通模式访问,依赖于外部排序和排除,和/或将它们包含在其他volatile访问中。通过跟踪生产者的单线程所有权(以偏向锁定的方式)来避免一些原子操作。

使用字段“ctl”管理执行控制和协议状态。订阅、关闭、请求和取消方法设置ctl位(主要使用原子布尔方法getAndBitwiseOr),并确保任务正在运行。 (相应的消费者端操作在方法consume中。)为了避免在每个操作上启动新任务,ctl还包括keep-alive位(ACTIVE):如果需要生producer操作则刷新。 (保持关于keep-alives的协议要求大多数原子更新都是完整的SC /Volatile强度,这仍然比每个元素使用一个任务便宜得多。)错误信号另外使items和/或字段为空(null out)以减少终止延迟。cancel()方法支持被当作ERROR处理但抑制onError信号。

对阻塞的支持也利用了只有一个可能的等待线程的事实。 ManagedBlocker兼容的控制字段放在此类本身而不是等待节点中。阻止控制依赖于“waiting”和“waiter”字段。生产者在试图阻塞之前设置它们。Signalling unparks并清除字段。如果生产者和/或消费者使用ForkJoinPool,生产者会尝试在阻塞之前通过ForkJoinPool.helpAsyncBlocker帮助运行消费者任务。

此类的用法可能会遇到几种形式的内存争用。我们试图改善它们,而不会在不需要的低争用惯例中过度影响脚印。缓冲数组从小开始,只在需要时增长。该类使用@Contended和heuristic字段声明排序来减少BufferedSubscription实例之间的错误共享内存争用(如每个发布者的多个订阅者)。我们另外隔离了一些字段,否则这些字段几乎总是会遇到生产者和消费者之间的缓存行争用。为了减少跨时间(与空间相比)的争用,消费者只是定期更新其他字段(请参阅方法takeItems),代价是滞后和需求的可能的staler报告(限制在12.5%== 1/8容量)和可能更多的原子操作。

当编译生产者和消费者方法和/或以不同的速率分配存储器时,在启动期间可能发生其他形式的不平衡和减速。通过人为地细分一些消费者方法,包括隔离所有subscriber回调,可以改善这一点。该代码还包括典型的二次幂数组筛选惯用法,以避免编译器生成陷阱,以及通常的基于SSA的内联分配编码样式。此外,所有方法和字段都具有默认可见性,以简化调用者的使用。

2.1 onSubscribe

        /**
         * Sets the given control bits, starting task if not running or closed.
         * @param bits state bits, assumed to include RUN but not CLOSED
         */
        final void startOnSignal(int bits) {
            if ((ctl & bits) != bits &&
                (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
                tryStart();
        }

        final void onSubscribe() {
            startOnSignal(RUN | ACTIVE);
        }
        /**
         * Tries to start consumer task. Sets error state on failure.
         */
        final void tryStart() {
            try {
                Executor e;
                ConsumerTask<T> task = new ConsumerTask<T>(this);
                if ((e = executor) != null)   // skip if disabled on error
                    e.execute(task);
            } catch (RuntimeException | Error ex) {
                getAndBitwiseOrCtl(ERROR | CLOSED);
                throw ex;
            }
        }

可知,onSubscirbe会尝试启动线程池ForkJoinPool来执行ConsumerTask任务。

2.2 consume

        // Consumer task actions

        /**
         * Consumer loop, called from ConsumerTask, or indirectly when
         * helping during submit.
         */
        final void consume() {
            Subscriber<? super T> s;
            if ((s = subscriber) != null) {          // hoist checks
                subscribeOnOpen(s);
                long d = demand;
                for (int h = head, t = tail;;) {
                    int c, taken; boolean empty;
                    if (((c = ctl) & ERROR) != 0) {
                        closeOnError(s, null);
                        break;
                    }
                    else if ((taken = takeItems(s, d, h)) > 0) {
                        head = h += taken;
                        d = subtractDemand(taken);
                    }
                    else if ((d = demand) == 0L && (c & REQS) != 0)
                        weakCasCtl(c, c & ~REQS);    // exhausted demand
                    else if (d != 0L && (c & REQS) == 0)
                        weakCasCtl(c, c | REQS);     // new demand
                    else if (t == (t = tail)) {      // stability check
                        if ((empty = (t == h)) && (c & COMPLETE) != 0) {
                            closeOnComplete(s);      // end of stream
                            break;
                        }
                        else if (empty || d == 0L) {
                            int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
                            if (weakCasCtl(c, c & ~bit) && bit == RUN)
                                break;               // un-keep-alive or exit
                        }
                    }
                }
            }
        }

2.2.1 subscribeOnOpen

        /**
         * Issues subscriber.onSubscribe if this is first signal.
         */
        final void subscribeOnOpen(Subscriber<? super T> s) {
            if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
                consumeSubscribe(s);
        }

        final void consumeSubscribe(Subscriber<? super T> s) {
            try {
                if (s != null) // ignore if disabled
                    s.onSubscribe(this);
            } catch (Throwable ex) {
                closeOnError(s, ex);
            }
        }

这里执行任务时,会首先调用订阅者Subscriber.onSubscirbe方法。根据响应式编程-FlowApi可知:

    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
        public final void request(long n) {
            if (n > 0L) {
                for (;;) {
                    long p = demand, d = p + n;  // saturate
                    if (casDemand(p, d < p ? Long.MAX_VALUE : d))
                        break;
                }
                startOnSignal(RUN | ACTIVE | REQS);
            }
            else
                onError(new IllegalArgumentException(
                            "non-positive subscription request"));
        }

request纯粹更新该订阅者的需求。

2.2.2 takeItems

        /**
         * Consumes some items until unavailable or bound or error.
         *
         * @param s subscriber
         * @param d current demand
         * @param h current head
         * @return number taken
         */
        final int takeItems(Subscriber<? super T> s, long d, int h) {
            Object[] a;
            int k = 0, cap;
            if ((a = array) != null && (cap = a.length) > 0) {
                int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
                int n = (d < (long)b) ? (int)d : b;
                for (; k < n; ++h, ++k) {
                    Object x = QA.getAndSet(a, h & m, null);
                    if (waiting != 0)
                        signalWaiter();
                    if (x == null)
                        break;
                    else if (!consumeNext(s, x))
                        break;
                }
            }
            return k;
        }

        final boolean consumeNext(Subscriber<? super T> s, Object x) {
            try {
                @SuppressWarnings("unchecked") T y = (T) x;
                if (s != null)
                    s.onNext(y);
                return true;
            } catch (Throwable ex) {
                handleOnNext(s, ex);
                return false;
            }
        }

        /**
         * Processes exception in Subscriber.onNext.
         */
        final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
            BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
            try {
                if ((h = onNextHandler) != null)
                    h.accept(s, ex);
            } catch (Throwable ignore) {
            }
            closeOnError(s, ex);
        }


        // Blocking support

        /**
         * Unblocks waiting producer.
         */
        final void signalWaiter() {
            Thread w;
            waiting = 0;
            if ((w = waiter) != null)
                LockSupport.unpark(w);
        }

根据响应式编程-FlowApi可知:

    @Override
    public void onNext(final Integer magazineNumber) {
        if (magazineNumber != nextMagazineExpected) {
            IntStream.range(nextMagazineExpected, magazineNumber).forEach(
                    (msgNumber) ->
                            log("Oh no! I missed the magazine " + msgNumber)
            );
            // Catch up with the number to keep tracking missing ones
            nextMagazineExpected = magazineNumber;
        }
        log("Great! I got a new magazine: " + magazineNumber);
        takeSomeRest();
        nextMagazineExpected++;
        totalRead++;

        log("I'll get another magazine now, next one should be: " +
                nextMagazineExpected);
        subscription.request(1);
    }

3.ConsumerTask

    /**
     * A task for consuming buffer items and signals, created and
     * executed whenever they become available. A task consumes as
     * many items/signals as possible before terminating, at which
     * point another task is created when needed. The dual Runnable
     * and ForkJoinTask declaration saves overhead when executed by
     * ForkJoinPools, without impacting other kinds of Executors.
     */
    @SuppressWarnings("serial")
    static final class ConsumerTask<T> extends ForkJoinTask<Void>
        implements Runnable, CompletableFuture.AsynchronousCompletionTask {
        final BufferedSubscription<T> consumer;
        ConsumerTask(BufferedSubscription<T> consumer) {
            this.consumer = consumer;
        }
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { consumer.consume(); return false; }
        public final void run() { consumer.consume(); }
    }

这里调用BufferedSubscription.consume方法。

4.subscribe

    public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
        int max = maxBufferCapacity; // allocate initial array
        Object[] array = new Object[max < INITIAL_CAPACITY ?
                                    max : INITIAL_CAPACITY];
        BufferedSubscription<T> subscription =
            new BufferedSubscription<T>(subscriber, executor, onNextHandler,
                                        array, max);
        synchronized (this) {
            if (!subscribed) {
                subscribed = true;
                owner = Thread.currentThread();
            }
            for (BufferedSubscription<T> b = clients, pred = null;;) {
                if (b == null) {
                    Throwable ex;
                    subscription.onSubscribe();
                    if ((ex = closedException) != null)
                        subscription.onError(ex);
                    else if (closed)
                        subscription.onComplete();
                    else if (pred == null)
                        clients = subscription;
                    else
                        pred.next = subscription;
                    break;
                }
                BufferedSubscription<T> next = b.next;
                if (b.isClosed()) {   // remove
                    b.next = null;    // detach
                    if (pred == null)
                        clients = next;
                    else
                        pred.next = next;
                }
                else if (subscriber.equals(b.subscriber)) {
                    b.onError(new IllegalStateException("Duplicate subscribe"));
                    break;
                }
                else
                    pred = b;
                b = next;
            }
        }
    }
    /**
     * Clients (BufferedSubscriptions) are maintained in a linked list
     * (via their "next" fields). This works well for publish loops.
     * It requires O(n) traversal to check for duplicate subscribers,
     * but we expect that subscribing is much less common than
     * publishing. Unsubscribing occurs only during traversal loops,
     * when BufferedSubscription methods return negative values
     * signifying that they have been closed.  To reduce
     * head-of-line blocking, submit and offer methods first call
     * BufferedSubscription.offer on each subscriber, and place
     * saturated ones in retries list (using nextRetry field), and
     * retry, possibly blocking or dropping.
     */
    BufferedSubscription<T> clients;

5.offer

    /**
     * Publishes the given item, if possible, to each current subscriber
     * by asynchronously invoking its {@link
     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
     * resources for any subscription are unavailable, up to the
     * specified timeout or until the caller thread is interrupted, at
     * which point the given handler (if non-null) is invoked, and if it
     * returns true, retried once. (The drop handler may distinguish
     * timeouts from interrupts by checking whether the current thread
     * is interrupted.)  Other calls to methods in this class by other
     * threads are blocked while the handler is invoked.  Unless
     * recovery is assured, options are usually limited to logging the
     * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
     * onError} signal to the subscriber.
     *
     * <p>This method returns a status indicator: If negative, it
     * represents the (negative) number of drops (failed attempts to
     * issue the item to a subscriber). Otherwise it is an estimate of
     * the maximum lag (number of items submitted but not yet
     * consumed) among all current subscribers. This value is at least
     * one (accounting for this submitted item) if there are any
     * subscribers, else zero.
     *
     * <p>If the Executor for this publisher throws a
     * RejectedExecutionException (or any other RuntimeException or
     * Error) when attempting to asynchronously notify subscribers, or
     * the drop handler throws an exception when processing a dropped
     * item, then this exception is rethrown.
     *
     * @param item the (non-null) item to publish
     * @param timeout how long to wait for resources for any subscriber
     * before giving up, in units of {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     * {@code timeout} parameter
     * @param onDrop if non-null, the handler invoked upon a drop to a
     * subscriber, with arguments of the subscriber and item; if it
     * returns true, an offer is re-attempted (once)
     * @return if negative, the (negative) number of drops; otherwise
     * an estimate of maximum lag
     * @throws IllegalStateException if closed
     * @throws NullPointerException if item is null
     * @throws RejectedExecutionException if thrown by Executor
     */
    public int offer(T item, long timeout, TimeUnit unit,
                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        long nanos = unit.toNanos(timeout);
        // distinguishes from untimed (only wrt interrupt policy)
        if (nanos == Long.MAX_VALUE) --nanos;
        return doOffer(item, nanos, onDrop);
    }
    /**
     * Common implementation for all three forms of submit and offer.
     * Acts as submit if nanos == Long.MAX_VALUE, else offer.
     */
    private int doOffer(T item, long nanos,
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        if (item == null) throw new NullPointerException();
        int lag = 0;
        boolean complete, unowned;
        synchronized (this) {
            Thread t = Thread.currentThread(), o;
            BufferedSubscription<T> b = clients;
            if ((unowned = ((o = owner) != t)) && o != null)
                owner = null;                     // disable bias
            if (b == null)
                complete = closed;
            else {
                complete = false;
                boolean cleanMe = false;
                BufferedSubscription<T> retries = null, rtail = null, next;
                do {
                    next = b.next;
                    int stat = b.offer(item, unowned);
                    if (stat == 0) {              // saturated; add to retry list
                        b.nextRetry = null;       // avoid garbage on exceptions
                        if (rtail == null)
                            retries = b;
                        else
                            rtail.nextRetry = b;
                        rtail = b;
                    }
                    else if (stat < 0)            // closed
                        cleanMe = true;           // remove later
                    else if (stat > lag)
                        lag = stat;
                } while ((b = next) != null);

                if (retries != null || cleanMe)
                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
            }
        }
        if (complete)
            throw new IllegalStateException("Closed");
        else
            return lag;
    }
    /**
     * Helps, (timed) waits for, and/or drops buffers on list; returns
     * lag or negative drops (for use in offer).
     */
    private int retryOffer(T item, long nanos,
                           BiPredicate<Subscriber<? super T>, ? super T> onDrop,
                           BufferedSubscription<T> retries, int lag,
                           boolean cleanMe) {
        for (BufferedSubscription<T> r = retries; r != null;) {
            BufferedSubscription<T> nextRetry = r.nextRetry;
            r.nextRetry = null;
            if (nanos > 0L)
                r.awaitSpace(nanos);
            int stat = r.retryOffer(item);
            if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
                stat = r.retryOffer(item);
            if (stat == 0)
                lag = (lag >= 0) ? -1 : lag - 1;
            else if (stat < 0)
                cleanMe = true;
            else if (lag >= 0 && stat > lag)
                lag = stat;
            r = nextRetry;
        }
        if (cleanMe)
            cleanAndCount();
        return lag;
    }

调用BufferedSubscription.offer:

        /**
         * Tries to add item and start consumer task if necessary.
         * @return negative if closed, 0 if saturated, else estimated lag
         */
        final int offer(T item, boolean unowned) {
            Object[] a;
            int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
            int t = tail, i = t & (cap - 1), n = t + 1 - head;
            if (cap > 0) {
                boolean added;
                if (n >= cap && cap < maxCapacity) // resize
                    added = growAndoffer(item, a, t);
                else if (n >= cap || unowned)      // need volatile CAS
                    added = QA.compareAndSet(a, i, null, item);
                else {                             // can use release mode
                    QA.setRelease(a, i, item);
                    added = true;
                }
                if (added) {
                    tail = t + 1;
                    stat = n;
                }
            }
            return startOnOffer(stat);
        }

6.总结

上一篇下一篇

猜你喜欢

热点阅读