SpringCloud系列之网关gateway-10.过滤器机制

2021-09-16  本文已影响0人  那钱有着落吗

看过滤器的源码,大体上就从下图中的三个主要的类了:

image.png

接下来我们还是实际的从debug一步步的看吧,首先发一个gateway的请求:

RoutePredicateHandlerMapping

        @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on management port if set and different than server port
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

首先我们把断点打到getHandlerInternal方法中,第一个if语句判断的是有没有management.server.port然后是不是与服务端口一致,如果不一致则不处理这个请求。

接下来这个方法就是放了一个属性值,值就是当前类名:

exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

接下来是响应式编程的写法:

return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));

我们首先看lookupRoute(exchange)方法:

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes()
                // individually filter routes so that filterWhen error delaying is not a
                // problem
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    // add the current route we are testing
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    return r.getPredicate().apply(exchange);
                })
                        // instead of immediately stopping main flux due to error, log and
                        // swallow it
                        .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
                        .onErrorResume(e -> Mono.empty()))
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                // TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });

        /*
         * TODO: trace logging if (logger.isTraceEnabled()) {
         * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
         */
    }

这里我们主要看两个方法中的内容,一个是concatMap,一个是最后的map,如果对响应式编程不太了解,可以去了解一下

concatMap中的内容:

exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);

可以看到首先在map中放入了 GATEWAY_PREDICATE_ROUTE_ATTR为key,值为当前路由的id,然后把这个路由的predicate拿出来进行校验,我们可以看到有一个apply的方法,这个方法我们同样可以去我们之前做的predicate中去看一下:

下面是我们之前做的自定义过滤器,我们做了几个断言,一个是path,一个是method

@Bean
    @Order
    public RouteLocator customizedRoutes(RouteLocatorBuilder routeLocatorBuilder){
        return routeLocatorBuilder.routes()
                .route(r->r.path("/java/**")
                        .and().method(HttpMethod.GET)
                        .and().header("name")
                        .filters(f->f.stripPrefix(1)
                                .addResponseHeader("java-param","gateway-config")
//                                .filter(timerFilter)
                        )
                        .uri("lb://feign-client")
                )
                .route(r->r.path("/seckill/**")
                        .and().after(ZonedDateTime.now().plusMinutes(1))
                        .filters(f->f.stripPrefix(1))
                        .uri("lb://feign-client")
                )
                .build();
    }

首先我们看一下method方法,可以点进去看一下:

public BooleanSpec method(HttpMethod... methods) {
        return asyncPredicate(getBean(MethodRoutePredicateFactory.class).applyAsync(c -> {
            c.setMethods(methods);
        }));
    }

然后再看一下MethodRoutePredicateFactory.class这个类,可以看到这个类中有一个apply方法:

@Override
  public Predicate<ServerWebExchange> apply(Config config) {
      return new GatewayPredicate() {
          @Override
          public boolean test(ServerWebExchange exchange) {
              HttpMethod requestMethod = exchange.getRequest().getMethod();
              return stream(config.getMethods()).anyMatch(httpMethod -> httpMethod == requestMethod);
          }

          @Override
          public String toString() {
              return String.format("Methods: %s", Arrays.toString(config.getMethods()));
          }
      };
  }

是不是就终于看到了实现方法,这个方法拿到了当前的请求类型,然后与断言中的请求类型做对比。

然后我们继续回到主线剧情,concatMap会挨个把路由中的所有断言都做一个校验,只有通过了所有的校验,请求才会进入到这个路由中,然后最后一步map就会把这个路由返回出去。

返回之后就到了这里:

return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));

返回了路由之后,就会进入到下一步验证filter:

Mono.just(webHandler);

我们在类FilteringWebHandler中看下:

@Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        // TODO: needed or cached?
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }

        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

首先第一句Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);用来获取通过前面验证的路由,为什么能找到呢,我们可以回头看一下,在之前的RoutePredicateHandlerMapping类中

return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                })

这段代码中的:

exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);

就会把选中的路由给放进去,所以在之后的filter验证中就可以直接拿到这个路由。

拿到这个路由之后,gateway会把这个路由下的所有的filter全部拿出来。

List<GatewayFilter> gatewayFilters = route.getFilters();

然后把所有的全局filter也拿出来,然后两种filter都放在一起:

List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);

然后就是排序:

AnnotationAwareOrderComparator.sort(combined);

我们可以点进去看一下:

public static void sort(List<?> list) {
        if (list.size() > 1) {
            list.sort(INSTANCE);
        }
    }

在看一下INSTANCE是个什么东西:

public static final AnnotationAwareOrderComparator INSTANCE = new AnnotationAwareOrderComparator();


    /**
     * This implementation checks for {@link Order @Order} or
     * {@link javax.annotation.Priority @Priority} on various kinds of
     * elements, in addition to the {@link org.springframework.core.Ordered}
     * check in the superclass.
     */
    @Override
    @Nullable
    protected Integer findOrder(Object obj) {
        Integer order = super.findOrder(obj);
        if (order != null) {
            return order;
        }
        return findOrderFromAnnotation(obj);
    }

可以看到进去之后除了他本身设置了一个单例之外,还有一个findOrder方法,我们继续详细的看super.findOrder(obj);:

@Nullable
    protected Integer findOrder(Object obj) {
        return (obj instanceof Ordered ? ((Ordered) obj).getOrder() : null);
    }

这个方法是不是看懂了? 如果这个对象类型是Ordered,那么就调用它的getOrder方法,还好我们自定义过滤器是实现了Ordered类,并且重写了getOrder方法:


@Component
@Slf4j
//GlobalFilter 全局filter  GatewayFilter 局部filter
public class TimerFilter implements GatewayFilter, Ordered {


    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        StopWatch timer = new StopWatch();
        timer.start(exchange.getRequest().getURI().getRawPath());

        return chain.filter(exchange).then(
                Mono.fromRunnable( ()-> {
                            timer.stop();
                            log.info(timer.prettyPrint());
                        }
                )
        );
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

所以我们这个自定义的过滤器的排序号就是0;

然后findOrder方法最后一句就是

findOrderFromAnnotation(obj);

当没有继承实现Ordered类,那么就会进行一系列的操作让他有一个order。

我们继续主线往下走:

return new DefaultGatewayFilterChain(combined).filter(exchange);

点进去看:

private static class DefaultGatewayFilterChain implements GatewayFilterChain {

        private final int index;

        private final List<GatewayFilter> filters;

        DefaultGatewayFilterChain(List<GatewayFilter> filters) {
            this.filters = filters;
            this.index = 0;
        }

        private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
            this.filters = parent.getFilters();
            this.index = index;
        }

        public List<GatewayFilter> getFilters() {
            return filters;
        }

        @Override
        public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                }
                else {
                    return Mono.empty(); // complete
                }
            });
        }

    }

这段方法非常的有意思,首先这个是FilteringWebHandler里面的一个内部类,然后里面的方法主要是挨个把filter都给验证一遍:

if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                }

尤其是这一段,首先index默认是0,然后这里首先会根据index取到遍历的filter,然后新建一个这个类,构造器的两个参数分别是父级类和index,这里index进行了加1的操作。

然后filter.filter(exchange,chain); 首先当前的filter执行了类下面的filter方法,然后把新建的chain给传入:

就拿我们自定义的filter举例:

@Override
   public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

       StopWatch timer = new StopWatch();
       timer.start(exchange.getRequest().getURI().getRawPath());

       return chain.filter(exchange).then(
               Mono.fromRunnable( ()-> {
                           timer.stop();
                           log.info(timer.prettyPrint());
                       }
               )
       );
   }

在执行了本身的filter之后,会使用传入的chain进行下一次的filter,就这么一个类接着一个类的把所有的filter全部校验完。

这种方法写的很优雅,不像是不断的for,不断的遍历,这个写法就显得比较清晰明了。

关于gateway 路由过滤器机制解析源码就到此。

上一篇下一篇

猜你喜欢

热点阅读