响应式编程——Flow API

2019-07-29  本文已影响0人  王侦

1.官方文档

Interrelated interfaces and static methods for establishing flow-
controlled components in which Publishers produce items 
consumed by one or more Subscribers, each managed by a 
Subscription.

These interfaces correspond to the reactive-streams specification. 
They apply in both concurrent and distributed asynchronous 
settings: All (seven) methods are defined in void "one-way" 
message style. Communication relies on a simple form of flow 
control (method Flow.Subscription.request(long)) that can be used 
to avoid resource management problems that may otherwise occur 
in "push" based systems.

Examples. A Flow.Publisher usually defines its own 
Flow.Subscription implementation; constructing one in method 
subscribe and issuing it to the calling Flow.Subscriber. It publishes 
items to the subscriber asynchronously, normally using an 
Executor. For example, here is a very simple publisher that only 
issues (when requested) a single TRUE item to a single 
subscriber. Because the subscriber receives only a single item, 
this class does not use buffering and ordering control required in 
most implementations (for example SubmissionPublisher).

用于建立流控制组件的相互关联的接口和静态方法,其中Publishers生产由一个或多个Subscribers使用的元素,Subscriber由Subscription管理。

这些接口对应于reactive-streams规范。它们适用于并发和分布式异步环境:所有(七种)方法都以void“单向”消息样式定义。通信依赖于简单形式的流控制(方法Flow.Subscription.request(long)),可用于避免在“基于推送”的系统中可能发生的资源管理问题。

例子。 Flow.Publisher通常定义自己的Flow.Subscription实现;在方法subscribe中构造一个并将其发布到调用Flow.Subscriber。它通常使用Executor异步地向订阅者发布items。例如,这是一个非常简单的发布者,它只向单个订阅者发出(如果请求)单个TRUE元素。由于订阅者只接收单个元素,因此该类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher)。

 class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }
A Flow.Subscriber arranges that items be requested and 
processed. Items (invocations of Flow.Subscriber.onNext(T)) are 
not issued unless requested, but multiple items may be requested.
 Many Subscriber implementations can arrange this in the style of 
the following example, where a buffer size of 1 single-steps, and 
larger sizes usually allow for more efficient overlapped processing 
with less communication; for example with a value of 64, this 
keeps total outstanding requests between 32 and 64. Because 
Subscriber method invocations for a given Flow.Subscription are 
strictly ordered, there is no need for these methods to use locks or 
volatiles unless a Subscriber maintains multiple Subscriptions (in 
which case it is better to instead define multiple Subscribers, each 
with its own Subscription).

Flow.Subscriber安排要请求和处理的元素。 除非请求,否则不会发出元素(Flow.Subscriber.onNext(T)的调用),但可能会请求多个元素。 许多订阅者实现可以按照以下示例的样式进行排列,其中缓冲区大小为1个单步,而较大的大小通常允许更有效的重叠处理和更少的通信; 例如,值为64,这使得未完成的请求总数保持在32到64之间。由于对给定Flow.Subscription的订阅者方法调用是严格排序的,因此除非订阅者维护多个订阅,否则这些方法不需要使用锁或volatile。 (在多个订阅情况下,最好定义多个Subscribers,每个订阅者都有自己的Subscription)。

 class SampleSubscriber<T> implements Subscriber<T> {
   final Consumer<? super T> consumer;
   Subscription subscription;
   final long bufferSize;
   long count;
   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
     this.bufferSize = bufferSize;
     this.consumer = consumer;
   }
   public void onSubscribe(Subscription subscription) {
     long initialRequestSize = bufferSize;
     count = bufferSize - bufferSize / 2; // re-request when half consumed
     (this.subscription = subscription).request(initialRequestSize);
   }
   public void onNext(T item) {
     if (--count <= 0)
       subscription.request(count = bufferSize - bufferSize / 2);
     consumer.accept(item);
   }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
 }
The default value of defaultBufferSize() may provide a useful 
starting point for choosing request sizes and capacities in Flow 
components based on expected rates, resources, and usages. Or, 
when flow control is never needed, a subscriber may initially 
request an effectively unbounded number of items, as in:

defaultBufferSize()的默认值可以提供一个有用的起点,用于根据预期的速率、资源和用法选择Flow组件中的请求大小和容量。 或者,当从不需要流量控制时,subscriber最初可以请求有效无限数量的元素,如:

 class UnboundedSubscriber<T> implements Subscriber<T> {
   public void onSubscribe(Subscription subscription) {
     subscription.request(Long.MAX_VALUE); // effectively unbounded
   }
   public void onNext(T item) { use(item); }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
   void use(T item) { ... }
 }

2.源码


3.Pull、Push和Pull-Push模式(处理数据模式)

4.Flow与Stream

总之,Flow更关注于通信处理,Stream更关注于数据处理。

5.杂志出版商示例

总结场景:

最终选择了方案d。

package com.wz.concurrent.other;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;

/**
 * @Author : Wang Zhen.
 * @Date : Created in 14:50 2019/7/29
 * @Description :
 * @Modified By   :
 * @Version :
 */
public class MagazineSubscriber implements Flow.Subscriber<Integer> {

    public static final String JACK = "Jack";
    public static final String PETE = "Pete";

//    private static final Logger log = LoggerFactory.
//            getLogger(MagazineSubscriber.class);

    private final long sleepTime;
    private final String subscriberName;
    private Flow.Subscription subscription;
    private int nextMagazineExpected;
    private int totalRead;

    MagazineSubscriber(final long sleepTime, final String subscriberName) {
        this.sleepTime = sleepTime;
        this.subscriberName = subscriberName;
        this.nextMagazineExpected = 1;
        this.totalRead = 0;
    }

    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final Integer magazineNumber) {
        if (magazineNumber != nextMagazineExpected) {
            IntStream.range(nextMagazineExpected, magazineNumber).forEach(
                    (msgNumber) ->
                            log("Oh no! I missed the magazine " + msgNumber)
            );
            // Catch up with the number to keep tracking missing ones
            nextMagazineExpected = magazineNumber;
        }
        log("Great! I got a new magazine: " + magazineNumber);
        takeSomeRest();
        nextMagazineExpected++;
        totalRead++;

        log("I'll get another magazine now, next one should be: " +
                nextMagazineExpected);
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log("Oops I got an error from the Publisher: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log("Finally! I completed the subscription, I got in total " +
                totalRead + " magazines.");
    }

    private void log(final String logMessage) {
        System.out.println("<=========== [" + subscriberName + "] : " + logMessage);
    }

    public String getSubscriberName() {
        return subscriberName;
    }

    private void takeSomeRest() {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

MagazineSubscriber中的方法:

package com.wz.concurrent.other;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author : Wang Zhen.
 * @Date : Created in 14:53 2019/7/29
 * @Description :
 * @Modified By   :
 * @Version :
 */
public class ReactiveFlowApp {

    private static final int NUMBER_OF_MAGAZINES = 20;
    private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
//    private static final Logger log =
//            LoggerFactory.getLogger(ReactiveFlowApp.class);

    public static void main(String[] args) throws Exception {
        final ReactiveFlowApp app = new ReactiveFlowApp();

        System.out.println("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
                "important in this case.");
        app.magazineDeliveryExample(100L, 100L, 8);

        System.out.println("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
                "size on the publisher's side to keep all items until they're picked up");
        app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

        System.out.println("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
                "size on the publisher's side so it's important to keep the slow " +
                "subscriber under control");
        app.magazineDeliveryExample(1000L, 3000L, 8);

    }

    void magazineDeliveryExample(final long sleepTimeJack,
                                 final long sleepTimePete,
                                 final int maxStorageInPO) throws Exception {
        final SubmissionPublisher<Integer> publisher =
                new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);

        final MagazineSubscriber jack = new MagazineSubscriber(
                sleepTimeJack,
                MagazineSubscriber.JACK
        );
        final MagazineSubscriber pete = new MagazineSubscriber(
                sleepTimePete,
                MagazineSubscriber.PETE
        );

        publisher.subscribe(jack);
        publisher.subscribe(pete);

        System.out.println("Printing 20 magazines per subscriber, with room in publisher for "
                + maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
                " seconds to consume each magazine.");
        IntStream.rangeClosed(1, 20).forEach((number) -> {
            System.out.println("Offering magazine " + number + " to consumers");
            final int lag = publisher.offer(
                    number,
                    MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
                    TimeUnit.SECONDS,
                    (subscriber, msg) -> {
                        subscriber.onError(
                                new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
                                        .getSubscriberName() + "! You are too slow getting magazines" +
                                        " and we don't have more space for them! " +
                                        "I'll drop your magazine: " + msg));
                        return false; // don't retry, we don't believe in second opportunities
                    });
            if (lag < 0) {
                log("Dropping " + -lag + " magazines");
            } else {
                log("The slowest consumer has " + lag +
                        " magazines in total to be picked up");
            }
        });

        // Blocks until all subscribers are done (this part could be improved
        // with latches, but this way we keep it simple)
        while (publisher.estimateMaximumLag() > 0) {
            Thread.sleep(500L);
        }

        // Closes the publisher, calling the onComplete() method on every subscriber
        publisher.close();
        // give some time to the slowest consumer to wake up and notice
        // that it's completed
        Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
    }

    private static void log(final String message) {
        System.out.println("===========> " + message);
    }

}

使用Java9 SubmissionPublisher类来创建publisher。正如javadoc所述, 当subscribers消费过慢,就像Reactive Streams中的Publisher一样她会阻塞或丢弃数据。

main()方法中使用不同参数调用以上逻辑三次,以模拟之前介绍的三种不同真是场景。

结果:
情况1的如下:

### CASE 1: Subscribers are fast, buffer size is not so important in this case.
Printing 20 magazines per subscriber, with room in publisher for 8. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
<=========== [Jack] : Great! I got a new magazine: 1
<=========== [Pete] : Great! I got a new magazine: 1
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Pete] : Great! I got a new magazine: 2
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 11 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Pete] : Great! I got a new magazine: 3
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 12 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : Great! I got a new magazine: 4
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 13 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Pete] : Great! I got a new magazine: 5
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 14 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Pete] : Great! I got a new magazine: 6
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 15 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 7
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 16 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 17 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 18 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Great! I got a new magazine: 10
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 19 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 11
<=========== [Pete] : Great! I got a new magazine: 11
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 20 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Great! I got a new magazine: 12
===========> The slowest consumer has 9 magazines in total to be picked up
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 15
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Great! I got a new magazine: 16
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 17
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Great! I got a new magazine: 18
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : Finally! I completed the subscription, I got in total 20 magazines.
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.

情况2:

### CASE 2: A slow subscriber, but a good enough buffer size on the publisher's side to keep all items until they're picked up
Printing 20 magazines per subscriber, with room in publisher for 20. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
<=========== [Jack] : Great! I got a new magazine: 1
<=========== [Pete] : Great! I got a new magazine: 1
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
===========> The slowest consumer has 10 magazines in total to be picked up
Offering magazine 11 to consumers
===========> The slowest consumer has 11 magazines in total to be picked up
Offering magazine 12 to consumers
===========> The slowest consumer has 12 magazines in total to be picked up
Offering magazine 13 to consumers
===========> The slowest consumer has 13 magazines in total to be picked up
Offering magazine 14 to consumers
===========> The slowest consumer has 14 magazines in total to be picked up
Offering magazine 15 to consumers
===========> The slowest consumer has 15 magazines in total to be picked up
Offering magazine 16 to consumers
===========> The slowest consumer has 16 magazines in total to be picked up
Offering magazine 17 to consumers
===========> The slowest consumer has 17 magazines in total to be picked up
Offering magazine 18 to consumers
===========> The slowest consumer has 18 magazines in total to be picked up
Offering magazine 19 to consumers
===========> The slowest consumer has 19 magazines in total to be picked up
Offering magazine 20 to consumers
===========> The slowest consumer has 20 magazines in total to be picked up
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 3
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : Great! I got a new magazine: 4
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : Great! I got a new magazine: 6
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 11
<=========== [Pete] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Great! I got a new magazine: 12
<=========== [Pete] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Great! I got a new magazine: 14
<=========== [Pete] : I'll get another magazine now, next one should be: 15
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 17
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Great! I got a new magazine: 18
<=========== [Pete] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Great! I got a new magazine: 20
<=========== [Pete] : I'll get another magazine now, next one should be: 21
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.
<=========== [Pete] : Finally! I completed the subscription, I got in total 20 magazines.

情况3:

### CASE 3: A slow subscriber, and a very limited buffer size on the publisher's side so it's important to keep the slow subscriber under control
Printing 20 magazines per subscriber, with room in publisher for 8. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
<=========== [Jack] : Great! I got a new magazine: 1
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
<=========== [Pete] : Great! I got a new magazine: 1
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Pete] : Great! I got a new magazine: 2
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 10
===========> Dropping 1 magazines
Offering magazine 11 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 12 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 12
===========> Dropping 1 magazines
Offering magazine 13 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Pete] : Great! I got a new magazine: 3
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 14 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 14
===========> Dropping 1 magazines
Offering magazine 15 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : Great! I got a new magazine: 4
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 16 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 16
===========> Dropping 1 magazines
Offering magazine 17 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Pete] : Great! I got a new magazine: 5
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 18 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 18
===========> Dropping 1 magazines
Offering magazine 19 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Pete] : Great! I got a new magazine: 6
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 20 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 20
===========> Dropping 1 magazines
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Oh no! I missed the magazine 10
<=========== [Pete] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Oh no! I missed the magazine 12
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Oh no! I missed the magazine 14
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Oh no! I missed the magazine 16
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Oh no! I missed the magazine 18
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Finally! I completed the subscription, I got in total 14 magazines.
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.

参考

上一篇 下一篇

猜你喜欢

热点阅读