reactor3 flux 多个订阅者

2022-04-17  本文已影响0人  simians

在一个数据源里想要有多个订阅者消费时应该怎么做呢?

        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Flux<String> flux = Flux.fromIterable(list);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);

那么如果我想要有三个个订阅者的时候才开始消费数据源该如何做呢

        Flux<String> flux = Flux.fromIterable(list);
        ConnectableFlux<String> con = flux.publish();
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        // 手动的开启消费数据
        con.connect();

如果感觉手动开启太麻烦也可以这样

    // autoConnect(3) 表示如果订阅者达到三个 就自动开启
        Flux<String> auto = flux.publish().autoConnect(3);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);

如果感觉还不够好的,比如当有一个订阅者突然断开了,我想停止消费数据该怎么做呢

      // 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
        Flux<String> auto = flux.publish().refCount(3);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);

      // 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
        Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);
上一篇下一篇

猜你喜欢

热点阅读