spring project reactor
Project Reactor 核心原理解析:
https://www.jianshu.com/p/df395eb28f69
http://blog.yanickxia.site/2018/06/26/java/spring/projectreactor/
声明
just 方法只是创建反应式流的众多方式的一个。在实际工作中,更常见的通过反应式 Repository 将数据库查询结果,或通过 Spring 5 的 WebClient 将 HTTP 调用结果最为流的开始。
subscribe(注册)
subscribe 阶段同行会触发数据发送。在本例中,后面可以看到,对于 FluxArray,数据发送很简单,就是循环发送。而对于像数据库、RPC 这样的长久,则会触发请求的发送。
subscribe 动作会代理给具体的 Flux 来实现。
onSubscribe
onSubscribe 阶段指的是 Subscriber#onSubscribe 方法被依次调用的阶段。这个阶段会让各 Subscriber 知道 subscribe 方法已被触发,真正的处理流程马上就要开始。所以这一阶段的工作相对简单。
request
onSubscribe 阶段是表示订阅动作的方式,让各 Subscriber 知悉,准备开始处理数据。当最终的 Subscriber 做好处理数据的准备之后,它便会调用 Subscription 的 request 方法请求数据。
调用阶段
这一阶段将会通过调用 Subscriber 的 onNext 方法,从而进行真正的反应式的数据处理。
Reactor 指南中文版:
http://projectreactor.mydoc.io/?t=44474
Advanced RxJava http://akarnokd.blogspot.com/ 系列博客的中文翻译:
https://blog.piasy.com/AdvancedRxJava/archive/index.html
https://projectreactor.io/docs/core/release/api/
demo
List<PointsExchangeSkuDTO> pointsExchangeSkuDTOS = Flux.fromIterable(() -> skuList.iterator())
.parallel(10)
.runOn(Schedulers.newParallel("validPointExcehageSku", 10))
.filter(sku -> checkRealPrice(sku, channelType))//过滤实时价格
.filter(sku -> checkSkuStock(sku))//实时促销库存
.filter(sku -> checkSkuReginStock(sku, exchangeSkuQueryParam))//实时区域库存
.map(sku -> mapperSkuStatus(sku, exchangeSkuQueryParam.getCustomerPin()))//赋值sku信息
.collectSortedList((sku1, sku2) -> Integer.compare(sku1.getPoints(), sku2.getPoints())).block();