Java

Reactor之发射器(Flux、Mono)创建函数

2018-11-13  本文已影响600人  Mr_1214

Flux

发射0到N个元素的异步"发射器

image

Mono

发射0到1个元素的异步"发射器

image

创建函数

以编程方式创建具有多次发射能力的Flux,
元素通过FluxSink API以同步或异步方式进行。

eg:

    Flux.create((t) -> {
            t.next("create");
            t.next("create1");
            t.complete();
        }).subscribe(System.out::println);

以编程方式创建一个的Flux,通过consumer回调逐一生成信号;generate中next只能调1次,否则会报错 reactor.core.Exceptions$ErrorCallbackNotImplemented

image

eg:

    Flux.generate(t -> {
            t.next("generate");
            //注意generate中next只能调用1次
            t.complete();
        }).subscribe(System.out::println);

创建一个Flux,它发出所提供的元素,然后完成。

image

eg:

    //单个元素
    Flux.just("just").subscribe(System.out::println);
    //多个元素
    Flux.just("just", "just1", "just2").subscribe(System.out::println);

用Flux API装饰指定的Publisher,通过Publisher创建一个Flux

image

eg:

    //Flux->Flux
        Flux.from(Flux.just("just", "just1", "just2"))
                .subscribe(System.out::println);
        //Mono->Mono
        Flux.from(Mono.just("just")).subscribe(System.out::println);

创建一个Flux,它发出包含在提供的数组中的项。

image

eg:

    Flux.fromArray(new String[] { "arr", "arr", "arr", "arr" })
                .subscribe(System.out::println);

创建一个个Flux,它发出所提供的Iterable中包含的项。将为每个subscriber创建一个新的Iterable。

image

eg:

Set<String> v = new HashSet<>();
    v.add("1");
    v.add("2");
    v.add("3");
Flux.fromIterable(() -> v.iterator()).subscribe(System.out::println);

创建一个Flux,它发出所提供的Stream中包含的项。请记住,Stream不能被重新使用,这可能是有问题的。多订阅或重订阅的情况(如repeat或retry)Stream是closed由操作员取消,错误或完成。

image

每当对得到的Flux进行Subscription时,延迟提供Publisher,因此实际的源实例化被推迟,直到每个订阅和Supplier可以创建订阅者特定的实例。
但是,如果供应商没有生成新的实例,这个操作符将有效地从Publisher起作用。

image

eg:

Flux.defer(() -> Flux.just("just", "just1", "just2"))
        .subscribe(System.out::println);

创建一个Flux,它以0开始发射长值并递增
全局计时器上指定的时间间隔。如果需求没有及时产生,一个OnError将用来发出信号。IllegalStateException详细说明无法发出的信息。在正常情况下,Flux将永远不会完成。

image

eg:

Flux.interval(Duration.of(500, ChronoUnit.MILLIS))
        .subscribe(System.out::println);
//防止程序过早退出,放一个CountDownLatch拦住
CountDownLatch latch = new CountDownLatch(1);
latch.await();

创建一个Flux,完成而不发射任何项目。

image

eg:

Flux.empty().subscribe(System.out::println);

创建一个Flux,它在订阅之后立即以指定的错误终止。

image

eg:

Flux.error(new RuntimeException()).subscribe(System.out::println);

创建一个Flux,它永远不会发出任何数据、错误或完成信号。

image

eg:

Flux.never().subscribe(System.out::println);

建立一个Flux,它只会发出一个count递增整数的序列,从start开始。也就是说,在start(包含)和start + count(排除)之间发出整数,然后完成。

image

eg:

Flux.range(0, 100).subscribe(System.out::println);
上一篇下一篇

猜你喜欢

热点阅读