Reactor学习:三、订阅

2020-07-01  本文已影响0人  睦月MTK

声明:


一、subscribe方法

注意:所有的操作只有在订阅的那一刻才开始进行!!!
subscribe方法有两种常用的形式:

public class CustomSubsriber<T> extends BaseSubscriber<T> {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("subscribed");
        request(1);
    }

    @Override
    protected void hookOnNext(T value) {
        System.out.println("get value:" + value);
        LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
        request(1);
    }

    @Override
    protected void hookOnComplete() {
        System.out.println("completed");
    }
}

二、背压——流控

Reactor中,下游想要控制上游的流速,是通过request来实现的,request的总数代表着下游当前的需求量,比如subscriber.request(1)就代表下游需要一个数据,多了不要。如果你把request的值设为Long.MAX_VALUE,则它意味着下游可以接收无限制的数据,比如Mono#blockFlux#blockFirstFlux#blockLast就默认无限制接收数据。对了,这三个方法也是一种订阅操作,使用它们会激活整个订阅过程,Mono#blockFlux#blockFirst均表示接收流中第一个数据并返回,如果在等待中接收到了完成信号则返回null,同理Flux#blockLast表示只接收最后一个数据,其会一直等待完成信号的到来。
值得注意的是:request并不一定恒定的,它可能会被整个上游中的某个操作修改,比如stringFlux.buffer(3).subscriber(null,null,null,sub -> {sub.request(2)})中,buffer(3)会将request进一步修改为6,因为sub.request(2)的2个请求是请求buffer(3)的输出结果的,而一个buffer(3)结果需要其上游的3个数据,故request被更改为了6


三、异步

Reactor中,如果不特别指定异步操作的话,那么整个流的发生到订阅过程全部会执行在subscribe那个线程中。所以最简单的异步使用Reactor的方法就是新建一个线程,并在其中执行subscribe,比如:

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); 

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> 
          System.out.println(v + Thread.currentThread().getName()) 
      )
  )
  t.start();
  t.join();
}

其结果为:

hello thread Thread-0

当然Reactor其实提供更加简便的异步操作方式,其中比较常用的就是publishOnSubscribeOn两个方法了,这两个方法都需要一个Scheduler类型的参数,它控制着操作的执行模式以及执行执行位置,单从表现上来看,倒是有点像ExecutorService。创建Scheduler你需要使用到Schedulers工厂类,里面定义了许多不同类型的Scheduler
1)Schedulers.immediate():直接在当前线程中立刻执行
2)Schedulers.single()/newSingle():提供一个单线程线程池以供操作,前者是一个固定的定义好的单线程线程池,后者你可以使用它来创建新的单线程线程池
3)Schedulers.elastic():相当于提供了一个CachedThreadPool
4)Schedulers.parallel():相当于提供了一个和Cpu核心数一样多的核心线程数的线程池
5)Schedulers.fromExecutorService():从现有的ExecutorService中引入

看个例子:

final Flux<String> flux = Flux
        .range(1, 2)
        .map(i -> {
            System.out.println("map1:"+Thread.currentThread().getName());
            LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
            return 10 + i;
        })
        .subscribeOn(s)
        .map(i -> {
            System.out.println("map2:"+Thread.currentThread().getName());
            LockSupport.parkNanos(Duration.ofSeconds(2).toNanos());
            return "value " + i;
        }).publishOn(Schedulers.single())
        .map(i -> {
            System.out.println("map3:"+Thread.currentThread().getName());
            return "mtk:"+i;
        });

new Thread(() -> flux.subscribe(System.out::println)).start();

结果如下:

map1:parallel-scheduler-1
map2:parallel-scheduler-1
map1:parallel-scheduler-1
map3:single-1
mtk:value 11
map2:parallel-scheduler-1
map3:single-1
mtk:value 12

从结果中我们可以发现整个流在没有运行在subscribe方法调用时所在的线程中,因为有subscribeOn的缘故,整个流运行在parallel-scheduler-1线程中,但是在map3操作前,有个publishOn,其使得其后的操作运行在了single-1线程中


参考文档:
[1] Reactor api doc
[2] Reactor reference doc

上一篇 下一篇

猜你喜欢

热点阅读