spring project reactor

2019-12-02  本文已影响0人  特仑苏纯牛乳

Project Reactor 核心原理解析:
https://www.jianshu.com/p/df395eb28f69
http://blog.yanickxia.site/2018/06/26/java/spring/projectreactor/

声明

just 方法只是创建反应式流的众多方式的一个。在实际工作中,更常见的通过反应式 Repository 将数据库查询结果,或通过 Spring 5 的 WebClient 将 HTTP 调用结果最为流的开始。

subscribe(注册)

subscribe 阶段同行会触发数据发送。在本例中,后面可以看到,对于 FluxArray,数据发送很简单,就是循环发送。而对于像数据库、RPC 这样的长久,则会触发请求的发送。
subscribe 动作会代理给具体的 Flux 来实现。

onSubscribe

onSubscribe 阶段指的是 Subscriber#onSubscribe 方法被依次调用的阶段。这个阶段会让各 Subscriber 知道 subscribe 方法已被触发,真正的处理流程马上就要开始。所以这一阶段的工作相对简单。

request

onSubscribe 阶段是表示订阅动作的方式,让各 Subscriber 知悉,准备开始处理数据。当最终的 Subscriber 做好处理数据的准备之后,它便会调用 Subscription 的 request 方法请求数据。

调用阶段

这一阶段将会通过调用 Subscriber 的 onNext 方法,从而进行真正的反应式的数据处理。

Reactor 指南中文版:
http://projectreactor.mydoc.io/?t=44474

Advanced RxJava http://akarnokd.blogspot.com/ 系列博客的中文翻译:
https://blog.piasy.com/AdvancedRxJava/archive/index.html

https://projectreactor.io/docs/core/release/api/

demo

List<PointsExchangeSkuDTO> pointsExchangeSkuDTOS = Flux.fromIterable(() -> skuList.iterator())
                .parallel(10)
                .runOn(Schedulers.newParallel("validPointExcehageSku", 10))
                .filter(sku -> checkRealPrice(sku, channelType))//过滤实时价格
                .filter(sku -> checkSkuStock(sku))//实时促销库存
                .filter(sku -> checkSkuReginStock(sku, exchangeSkuQueryParam))//实时区域库存
                .map(sku -> mapperSkuStatus(sku, exchangeSkuQueryParam.getCustomerPin()))//赋值sku信息
                .collectSortedList((sku1, sku2) -> Integer.compare(sku1.getPoints(), sku2.getPoints())).block();
上一篇下一篇

猜你喜欢

热点阅读