Reactor学习:二、生产者

2020-06-30  本文已影响0人  睦月MTK

声明:


一、Flux和Mono

Flux<T>Mono<T>Reactor中非常重要的两个生产者,它们都继承了Publisher<T>FluxMono都代表一个响应式序列,但是不同的是,Flux代表0—N个元素,Mono代表0—1个元素。其实称Mono为一个序列并不准确,称之为“结果”可能会更好。试想web中的应用场景,一个请求必然只生成一个响应,所以使用Mono<HttpResponse>比使用Flux<HttpResponse>语义上更为确切。


二、生产序列的方法
Flux<String> stringFlux = Flux.generate(() -> 0 , (state,sink) -> {
    sink.next("str"+state);
    //sink.next("str"+state);
    LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
    if(state++ == 10) sink.complete();
    return state;
},System.out::println);

这段代码的作用生成一个从"str0"一直到"str10"的序列,且每间隔一秒生成一个。在sink#complete方法被调用之前或者出现异常之前,该回调方法(即第二个参数BiFunction那一段)会被一直调用。需要注意的是sink#next在一次回调中只能被调用一次,否则将会报错,导致序列生成的中止。第三个参数可以不要,其作用是在中止时(包括异常带来的中止或者Disposable#dispose取消任务带来的中止)将当前状态(也就是state)进行一定的操作,代码中是直接将其打印了出来

Flux<String> stringFlux = Flux.create(sink -> {
    //onRequest在每次接收到request的时候都会被调用
    sink.onRequest(n -> {
        System.out.println("on request:"+n);
    }).onCancel(() -> {
        System.out.println("cancel");
        //onDispose在完成、错误、取消时候调用,取消的优先级低于onCancel
    }).onDispose(() -> {
        System.out.println("dispose");
    });
    myEventContext.addMyEventListener(new SinkEventListener() {
        @Override
        public void onCompleteEvent(SinkCompleteEvent event) {
            sink.complete();
        }

        @Override
        public void onNextEvent(SinkNextEvent event) {
            sink.next(event.getMessage());
        }
    });
}, FluxSink.OverflowStrategy.DROP);

在这段代码中每当有一个SinkNextEvent事件触发时,就会执行一次sink#next,当触发了onCompleteEvent事件时,就会执行sink#complete。第二个参数FluxSink.OverflowStrategy.xxx指定了背压(backpressure,即下游对上游的反馈控制,避免爆发“洪水”)的参数,背压参数有下列几种:
IGNORE 忽视反馈,我行我素,可能会抛出IllegalStateException
ERROR 超出下游接收能力时,抛出IllegalStateException
DROP 超出下游接收能力时(下游不准备接收时),丢弃掉上游数据
LATEST 不抛弃掉上游数据,但是只取最近的数据
BUFFER 默认值,缓存所有的超量数据,要注意超出内存限制的风险
注意:不论选择哪个,在下游第一次发出请求之前,上游的所有数据均会被丢弃。
额外需要注意的是,在sink#complete发生之后,如果还在使用sink#next来添加数据,该数据会被自动丢弃,并打印出类似[DEBUG] (pool-1-thread-1) onNextDropped: 0.7328237206959655的debug信息

Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).handle((item , sink) -> {
    if(item % 2 != 0) sink.next(item);
});

该段代码会生成1、3、5...的奇数序列,每隔2秒生成一次。item指的是上游发出的数据


参考文档:
[1] Reactor api doc
[2] Reactor reference doc

上一篇下一篇

猜你喜欢

热点阅读