SpringCloud系列之网关gateway-10.过滤器机制
看过滤器的源码,大体上就从下图中的三个主要的类了:
image.png接下来我们还是实际的从debug一步步的看吧,首先发一个gateway的请求:
RoutePredicateHandlerMapping
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
首先我们把断点打到getHandlerInternal方法中,第一个if语句判断的是有没有management.server.port然后是不是与服务端口一致,如果不一致则不处理这个请求。
接下来这个方法就是放了一个属性值,值就是当前类名:
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
接下来是响应式编程的写法:
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
我们首先看lookupRoute(exchange)方法:
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}
这里我们主要看两个方法中的内容,一个是concatMap,一个是最后的map,如果对响应式编程不太了解,可以去了解一下
concatMap中的内容:
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
可以看到首先在map中放入了 GATEWAY_PREDICATE_ROUTE_ATTR为key,值为当前路由的id,然后把这个路由的predicate拿出来进行校验,我们可以看到有一个apply的方法,这个方法我们同样可以去我们之前做的predicate中去看一下:
下面是我们之前做的自定义过滤器,我们做了几个断言,一个是path,一个是method
@Bean
@Order
public RouteLocator customizedRoutes(RouteLocatorBuilder routeLocatorBuilder){
return routeLocatorBuilder.routes()
.route(r->r.path("/java/**")
.and().method(HttpMethod.GET)
.and().header("name")
.filters(f->f.stripPrefix(1)
.addResponseHeader("java-param","gateway-config")
// .filter(timerFilter)
)
.uri("lb://feign-client")
)
.route(r->r.path("/seckill/**")
.and().after(ZonedDateTime.now().plusMinutes(1))
.filters(f->f.stripPrefix(1))
.uri("lb://feign-client")
)
.build();
}
首先我们看一下method方法,可以点进去看一下:
public BooleanSpec method(HttpMethod... methods) {
return asyncPredicate(getBean(MethodRoutePredicateFactory.class).applyAsync(c -> {
c.setMethods(methods);
}));
}
然后再看一下MethodRoutePredicateFactory.class这个类,可以看到这个类中有一个apply方法:
@Override
public Predicate<ServerWebExchange> apply(Config config) {
return new GatewayPredicate() {
@Override
public boolean test(ServerWebExchange exchange) {
HttpMethod requestMethod = exchange.getRequest().getMethod();
return stream(config.getMethods()).anyMatch(httpMethod -> httpMethod == requestMethod);
}
@Override
public String toString() {
return String.format("Methods: %s", Arrays.toString(config.getMethods()));
}
};
}
是不是就终于看到了实现方法,这个方法拿到了当前的请求类型,然后与断言中的请求类型做对比。
然后我们继续回到主线剧情,concatMap会挨个把路由中的所有断言都做一个校验,只有通过了所有的校验,请求才会进入到这个路由中,然后最后一步map就会把这个路由返回出去。
返回之后就到了这里:
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
返回了路由之后,就会进入到下一步验证filter:
Mono.just(webHandler);
我们在类FilteringWebHandler中看下:
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
首先第一句Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);用来获取通过前面验证的路由,为什么能找到呢,我们可以回头看一下,在之前的RoutePredicateHandlerMapping类中
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
})
这段代码中的:
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
就会把选中的路由给放进去,所以在之后的filter验证中就可以直接拿到这个路由。
拿到这个路由之后,gateway会把这个路由下的所有的filter全部拿出来。
List<GatewayFilter> gatewayFilters = route.getFilters();
然后把所有的全局filter也拿出来,然后两种filter都放在一起:
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
然后就是排序:
AnnotationAwareOrderComparator.sort(combined);
我们可以点进去看一下:
public static void sort(List<?> list) {
if (list.size() > 1) {
list.sort(INSTANCE);
}
}
在看一下INSTANCE是个什么东西:
public static final AnnotationAwareOrderComparator INSTANCE = new AnnotationAwareOrderComparator();
/**
* This implementation checks for {@link Order @Order} or
* {@link javax.annotation.Priority @Priority} on various kinds of
* elements, in addition to the {@link org.springframework.core.Ordered}
* check in the superclass.
*/
@Override
@Nullable
protected Integer findOrder(Object obj) {
Integer order = super.findOrder(obj);
if (order != null) {
return order;
}
return findOrderFromAnnotation(obj);
}
可以看到进去之后除了他本身设置了一个单例之外,还有一个findOrder方法,我们继续详细的看super.findOrder(obj);:
@Nullable
protected Integer findOrder(Object obj) {
return (obj instanceof Ordered ? ((Ordered) obj).getOrder() : null);
}
这个方法是不是看懂了? 如果这个对象类型是Ordered,那么就调用它的getOrder方法,还好我们自定义过滤器是实现了Ordered类,并且重写了getOrder方法:
@Component
@Slf4j
//GlobalFilter 全局filter GatewayFilter 局部filter
public class TimerFilter implements GatewayFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
StopWatch timer = new StopWatch();
timer.start(exchange.getRequest().getURI().getRawPath());
return chain.filter(exchange).then(
Mono.fromRunnable( ()-> {
timer.stop();
log.info(timer.prettyPrint());
}
)
);
}
@Override
public int getOrder() {
return 0;
}
}
所以我们这个自定义的过滤器的排序号就是0;
然后findOrder方法最后一句就是
findOrderFromAnnotation(obj);
当没有继承实现Ordered类,那么就会进行一系列的操作让他有一个order。
我们继续主线往下走:
return new DefaultGatewayFilterChain(combined).filter(exchange);
点进去看:
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
private final int index;
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
这段方法非常的有意思,首先这个是FilteringWebHandler里面的一个内部类,然后里面的方法主要是挨个把filter都给验证一遍:
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
尤其是这一段,首先index默认是0,然后这里首先会根据index取到遍历的filter,然后新建一个这个类,构造器的两个参数分别是父级类和index,这里index进行了加1的操作。
然后filter.filter(exchange,chain); 首先当前的filter执行了类下面的filter方法,然后把新建的chain给传入:
就拿我们自定义的filter举例:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
StopWatch timer = new StopWatch();
timer.start(exchange.getRequest().getURI().getRawPath());
return chain.filter(exchange).then(
Mono.fromRunnable( ()-> {
timer.stop();
log.info(timer.prettyPrint());
}
)
);
}
在执行了本身的filter之后,会使用传入的chain进行下一次的filter,就这么一个类接着一个类的把所有的filter全部校验完。
这种方法写的很优雅,不像是不断的for,不断的遍历,这个写法就显得比较清晰明了。
关于gateway 路由过滤器机制解析源码就到此。