响应式框架reactor3的 使用其一

2022-08-26  本文已影响0人  simians

响应式编程(Reactive programming) 是使用异步数据流(asynchronous data streams)进行编程。
特性:

        Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
                .log() // 打印日志
                //.limitRate(100) // 告诉上游我只能缓存100个 超过100个就会抛异常,下游每次消费100个后会告诉上游继续发送数据
                .onBackpressureBuffer(2000) // 设置背压缓存策略并设置缓存大小 当缓存超过2000 个就抛异常 这个方法会覆盖掉 limitRate
                .concatMap(x -> {
                    System.out.println(x);
                 return   Mono.delay(Duration.ofMillis(100));}) // concatMap 表示将上游的数据组装成一个flux
                .blockLast(); // 无限阻塞 知道执行完最后一条数据

上面代码回抛出异常如下:

19:57:58.204 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2015)
18
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2016)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2017)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2018)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2019)
19:57:58.222 [parallel-1] INFO reactor.Flux.Interval.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
    at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:170)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2497)
        at FluxCreateTest.main(FluxCreateTest.java:17)

从打印的日志可以看出 concatMap 这个方法已经执行了 18条数据 ,背压缓存里缓存了2000 条数据。在2019 这里超出限制 2019-18 = 2001>2000 所以抛出异常

下面简单介绍下 reactor3 相关方法
Flux & Mono 这两个类

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种信号的特点:

Flux<T> 是一个标准的 Publisher<T>,表示为发出0到N个元素的异步序列;
Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素,可以被 onComplete 或 onError 信号选择性终止;

        Mono.create(monoSink -> {
            monoSink.success(11); // 因为Mono 表示单数据 因此包含了complete() 方法
           monoSink.error(new RuntimeException("这是个异常")); // 这条数据不会执行
        })
        Flux.create(fluxSink -> {
           fluxSink.next("value"); // 元素
           fluxSink.complete();  // 完成         //个人认为 这里相当于 monoSink.Success() = next()+complete(); 
           fluxSink.error( new RuntimeException()); // 异常
        });

上面是数据的创建者下面还有写常用的数据创建者

// 将list 作为数据源添加到flux
Flux.fromIterable(List.of(1,2,3))
// 将流作为数据源
Flux.fromStream(List.of(1,2,3).stream());
// 立即创建一个数据源 饿汉模式
Flux.just(List.of());
// 对应的就有懒汉模式 这里每次调用就会返回一个
Flux.defer(()->Flux.just(1));

// 如果我想定时的创造数据流怎么办
Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
// 当我的数据生产者不止一个怎么做
Flux.create();

Mono.

订阅者

// 阻塞的
Mono.just(1).block();
// 非阻塞的
Mono.just(1).subscribe();
// flux 非阻塞的
Flux.just(1).subscribe();
// flux 阻塞 第一条数据
Flux.just(1).blockFirst();
// 阻塞最后一条数据
Flux.just(1).blockLast();



使用flux 实现文件的读写

        Flux.fromStream(Files.lines(Paths.get("17336.txt")))
                .log()
                .subscribe(new BaseSubscriber<String>() {
                    BufferedWriter bufferedWriter = Files.newBufferedWriter(Paths.get("凡人修仙传.txt"));
                    @Override
                    @SneakyThrows
                    protected void hookOnNext(String value) {
                        super.hookOnNext(value);
                        bufferedWriter.write(value+"\n");
                        if(value.contains("------------")){ // 每写入一个章节就将数据写入文件并刷新缓存
                            bufferedWriter.flush();
                        }
                    }

                    @Override
                    @SneakyThrows
                    protected void hookOnComplete() {
                        super.hookOnComplete();
                        bufferedWriter.close();
                    }
                });

冷数据发布者VS热数据发布者

首先来看定义:
冷数据发布者: 在向订阅者发布数据的时候都会从起始位置开始,如果没有订阅者就不会做任何事情。
热数据发布者:冷数据和热数据相反,即当一个新的订阅者来订阅流的时候会在最新的位置开始发送数据。

热数据发布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        ConnectableFlux hotFlux = coldFlux.publish();
        hotFlux.subscribe((s)->{
            System.out.println(s);
        });
        hotFlux.connect();
        Thread.sleep(6000);
        hotFlux.subscribe(s->{
            System.out.println(s);
        });
        Thread.sleep(1000000);

执行结果

16:53:10.474 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:53:10.495 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:53:10.498 [main] INFO reactor.Flux.Interval.1 - request(256)
16:53:12.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
热数据第一个订阅者:0
16:53:12.519 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
热数据第一个订阅者:1
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
热数据第一个订阅者:2
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
热数据第一个订阅者:3
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:20.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
热数据第一个订阅者:4
热数据第二个发布者:4
16:53:20.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:22.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
热数据第一个订阅者:5
热数据第二个发布者:5
冷数据发布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        coldFlux.subscribe((s)->{
            System.out.println("第一个订阅者:"+s);
        });
        Thread.sleep(6000);
        coldFlux.subscribe(s->{
            System.out.println("第二个订阅者:"+s);
        });
        Thread.sleep(1000000);

执行结果

16:36:52.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:36:52.806 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:52.807 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:36:54.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:0
16:36:56.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
第一个订阅者:1
16:36:58.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
第一个订阅者:2
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:37:00.819 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
16:37:00.819 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:3
第二个订阅者:0
16:37:02.811 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
第一个订阅者:4
16:37:02.826 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
第二个订阅者:1
16:37:04.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
第一个订阅者:5
16:37:04.827 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
第二个订阅者:2

异步与并行

讲到这里目前看到当前调用的方法基本上都是同步的,除了Flux.Interval()。但是interavl 这个方法局限性太大,那么有没有让整个流异步的方法呢?
先看代码:

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 161, 17, 18, 19, 20, 21, 22);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(2)
                .runOn(Schedulers.parallel()) // 执行
                .log()
                .subscribe(System.out::println);
        //Thread.sleep(1_000_000);

执行结果

17:32:52.962 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:32:52.990 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.991 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)

Process finished with exit code 0

看到执行结果可以判断这段代码是异步非阻塞的,相关的日志并没有被打印出来
下面将线程休眠注释关掉 执行结果

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(8)
                .runOn(Schedulers.parallel())
                .log()
                .subscribe(System.out::println);
        Thread.sleep(1_000_000);

执行结果:

16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onNext(1)
1
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onNext(2)
2
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onNext(3)
3
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onNext(4)
4
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onNext(5)
5
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onNext(7)
7
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onNext(6)
6
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onNext(8)
8
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onComplete()

上述代码设计到两个方法 parallel(),runOn() ;其中parallel() 这个方法 我在看国内的文章里说这个是开启异步的方法,但是我在看国外的文章与相关的文档的时候并没有说他是开启异步的方法。而是开启并行的方法。那这里就有歧义,那这里就需要验证下;看如下代码:

        Flux.range(1,10).parallel(3).log().subscribe(System.out::println);

执行结果:

10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(1)
1
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(2)
2
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(3)
3
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(4)
4
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(5)
5
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(6)
6
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(7)
7
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(8)
8
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(9)
9
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(10)
10
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()

执行结果发现并不是异步的,那这个方法到底是干什么的?
其实文档里说的很明白了,就是并行,他的作用就是拓宽通道,本来只有一个通道的,在我使用parallel(3)方法的时候通道变为3 个, 之后订阅者订阅这三个通道。


image.png

publishOn VS subscribeOn 后续有机会再说

相关文档:
背压解释
官方文档
publishOn 和 subscribeOn

上一篇下一篇

猜你喜欢

热点阅读