Reactor学习:二、生产者
声明:
一、Flux和Mono
Flux<T>
和Mono<T>
是Reactor
中非常重要的两个生产者,它们都继承了Publisher<T>
。Flux
和Mono
都代表一个响应式序列,但是不同的是,Flux
代表0—N个元素,Mono
代表0—1个元素。其实称Mono
为一个序列并不准确,称之为“结果”可能会更好。试想web中的应用场景,一个请求必然只生成一个响应,所以使用Mono<HttpResponse>
比使用Flux<HttpResponse>
语义上更为确切。
二、生产序列的方法
-
Flux#just(T...)
、Mono#just(T)
、Mono#justOrEmpty(T)
just
方法就是显式指定序列,类似Stream#of
-
Flux#from(Publisher)
、Flux#fromArray
、Flux#fromIterable
、Flux#range
、Flux#fromStream(Supplier<Stream>)
从Publisher、数组、集合、数据范围、流中生产序列 -
Mono#from(Publisher)
、Mono#fromSupplier
、Mono#fromRunnable
、Mono#fromCallable
、Mono#fromFuture
从其他地方获取结果,其中Mono#from(Publisher)
是截取Publisher
第一个元素 -
empty
、error
、never
1)empty
指的是一个直接完成的序列
2)error
指的是一个直接报错的序列
3)never
值的是一个不做任何事情的序列,比如发出数据、报告完成、报告错误等等 -
Flux#generate
Flux#generate
是一个同步的简易的程序化生成序列的方法,你需要提供一个初始值以及一个控制sink
的回调方法,其完整参数序列是generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
多说无益,看段代码就懂了
Flux<String> stringFlux = Flux.generate(() -> 0 , (state,sink) -> {
sink.next("str"+state);
//sink.next("str"+state);
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
if(state++ == 10) sink.complete();
return state;
},System.out::println);
这段代码的作用生成一个从"str0"一直到"str10"的序列,且每间隔一秒生成一个。在
sink#complete
方法被调用之前或者出现异常之前,该回调方法(即第二个参数BiFunction
那一段)会被一直调用。需要注意的是sink#next
在一次回调中只能被调用一次,否则将会报错,导致序列生成的中止。第三个参数可以不要,其作用是在中止时(包括异常带来的中止或者Disposable#dispose
取消任务带来的中止)将当前状态(也就是state)进行一定的操作,代码中是直接将其打印了出来
-
Flux#create
Flux#create
比之Flux#generate
要高级一点,其并不需要一个初始值,而且它支持多线程同时触发回调,因此,与监听机制结合在一起会得到意想不到的效果。
Flux<String> stringFlux = Flux.create(sink -> {
//onRequest在每次接收到request的时候都会被调用
sink.onRequest(n -> {
System.out.println("on request:"+n);
}).onCancel(() -> {
System.out.println("cancel");
//onDispose在完成、错误、取消时候调用,取消的优先级低于onCancel
}).onDispose(() -> {
System.out.println("dispose");
});
myEventContext.addMyEventListener(new SinkEventListener() {
@Override
public void onCompleteEvent(SinkCompleteEvent event) {
sink.complete();
}
@Override
public void onNextEvent(SinkNextEvent event) {
sink.next(event.getMessage());
}
});
}, FluxSink.OverflowStrategy.DROP);
在这段代码中每当有一个
SinkNextEvent
事件触发时,就会执行一次sink#next
,当触发了onCompleteEvent
事件时,就会执行sink#complete
。第二个参数FluxSink.OverflowStrategy.xxx
指定了背压(backpressure,即下游对上游的反馈控制,避免爆发“洪水”)的参数,背压参数有下列几种:
IGNORE
忽视反馈,我行我素,可能会抛出IllegalStateException
ERROR
超出下游接收能力时,抛出IllegalStateException
DROP
超出下游接收能力时(下游不准备接收时),丢弃掉上游数据
LATEST
不抛弃掉上游数据,但是只取最近的数据
BUFFER
默认值,缓存所有的超量数据,要注意超出内存限制的风险
注意:不论选择哪个,在下游第一次发出请求之前,上游的所有数据均会被丢弃。
额外需要注意的是,在sink#complete
发生之后,如果还在使用sink#next
来添加数据,该数据会被自动丢弃,并打印出类似[DEBUG] (pool-1-thread-1) onNextDropped: 0.7328237206959655
的debug信息
-
handle
该方法存在于Flux
以及Mono
中,比较特殊,相当于一个过滤机制,它可以承接上一个序列生成方法,将其生成的数据进行过滤后再输出,它使用的sink
是SynchronousSink
这表示sink#next
在一次回调中只能被执行一次。具体示例代码如下:
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).handle((item , sink) -> {
if(item % 2 != 0) sink.next(item);
});
该段代码会生成1、3、5...的奇数序列,每隔2秒生成一次。item指的是上游发出的数据
参考文档:
[1] Reactor api doc
[2] Reactor reference doc