响应式编程小记
背景
同事在搭建nacos + spring-cloud-gateway-core 2.1.0.RELEASE 动态路由配置的过程中,发现spring cloud gateway不能删除路由。
问题分析
通过断点调试,使用了InMemoryRouteDefinitionRepository也就是内存级的动态路由管理,删除方法也就是delete,其中代码如下:
package org.springframework.cloud.gateway.route;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.cloud.gateway.support.NotFoundException;
import static java.util.Collections.synchronizedMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Spencer Gibb
*/
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {
private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap( r -> {
routes.put(r.getId(), r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (routes.containsKey(id)) {
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
});
}
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(routes.values());
}
}
通过断点调试发现routes.remove(id);
代码并没有执行。
相关调用代码如下:
this.routeDefinitionWriter.delete(Mono.just(id));
简化代码
通过代码了解到spring cloud gateway中使用了reactor(反应式编程)来实现异步的方法执行,所以初步判断既然是异步执行,肯定在会有一个时间差,是不是这个异步导致的看不到代码执行呢?通过反复尝试,发现不是时间的问题,不管等待多少时间,这个代码都不会执行,routes中都不会删除对应的键值。
为了更方便的测试把代码简化提取为如下:
public class MonoTest {
private final Map<String, String> routes = synchronizedMap(new LinkedHashMap<String, String>());
public static void main(String[] args) {
MonoTest monoTest = new MonoTest();
Map<String, String> routes = monoTest.routes;
routes.put("123", "test11111");
routes.put("561", "test11111");
monoTest.delete(Mono.just("123"));
System.out.println(monoTest.routes);
}
public void add(String routeId) {
routes.put(routeId, "test11111");
}
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
System.out.println("准备进入判断语句 routes.containsKey(id)");
if (routes.containsKey(id)) {
System.out.println("routes.containsKey(id)");
routes.remove(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(
new NotFoundException("RouteDefinition not found: " + routeId)));
});
}
}
需要在pom中引入reactor依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
Reactor相关知识
既然代码中有Mono这个鬼东西,所以需要对其进行了解。
Project Reactor(以下简称“Reactor”)与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor中的发布者(Publisher)由Flux
和Mono
两个类定义,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果。
既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
-
Flux类型
下图所示就是一个Flux类型的数据流,黑色箭头是时间轴。它连续发出“1” - “6”共6个元素值,以及一个完成信号(图中⑥后边的加粗竖线来表示),完成信号告知订阅者数据流已经结束。
Flux类型
-
Mono类型
下图所示是一个Mono类型的数据流,它发出一个元素值后,又发出一个完成信号。
Mono类型
既然Flux具有发布一个数据元素的能力,为什么还要专门定义一个Mono类呢?举个例子,一个HTTP请求产生一个响应,所以对其进行“count”操作是没有多大意义的。表示这样一个结果的话,应该用Mono<HttpResponse>而不是 Flux<HttpResponse>,对于的操作通常只用于处理 0/1 个元素。它们从语义上就原生包含着元素个数的信息,从而避免了对Mono对象进行多元素场景下的处理。
- just
我们可以用如下代码声明上边两幅图所示的Flux和Mono:
Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);
-
三种信号 元素值、错误信号、完成信号
这三种信号都不是一定要具备的:
首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
如果没有错误信号和完成信号,那么就是一个无限数据流。 -
订阅前什么都不会发生
// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
这里需要注意的一点是,Flux.just(1, 2, 3, 4, 5, 6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流。所以,订阅前什么都不会发生。
问题解决
看完上文就明白为什么没有删除了,订阅之前什么都不会发生,所以想要执行对应的方法,必须手动调用subscribe()方法。
monoTest.delete(Mono.just("123")).subcribe();
详细学习reactor可以看下面的文章:
https://blog.51cto.com/liukang/2090191