Spring-BootJava

SpringCloud 网关组件 Gateway 原理深度解析

2023-01-07  本文已影响0人  互联网高级架构师

引入网关的必要性

在微服务架构下,我们一般会把相对独立的业务或功能划分为不同的服务,不同服务之间相互隔离,独立部署。这些微服务由服务注册中心集中管理。对于来自外部的用户请求来说,在处理请求的核心业务逻辑之前,一般还需要对请求做一些预处理,如权限校验、监控统计、流量限制等。如果外部请求直接到达微服务,那么所有的服务都需要各自负责处理这些功能,会造成这部分功能逻辑在各个服务重复出现,增加了未来对这些基础功能的维护升级的成本。

所以在微服务环境下,我们还需要一个网关组件来作为请求入口。一些基础的请求预处理的逻辑可以统一实现在网关这一层,这样业务服务只需要专注于处理业务逻辑即可。另外,引入网关作为统一请求入口之后,还可以利用网关来实现一些其他的功能,比如服务保护、灰度发布等。

1、Spring Cloud Gateway 简介

Spring Cloud Gateway 是 Spring Cloud 微服务生态下的网关组件。Spring Cloud Gateway 是基于 Spring 5 和 Spring Boot 2 搭建的,本质上是一个 Spring Boot 应用。在详细介绍其基本原理之前,先看一下通常而言,可以由微服务网关提供的功能。

在 Spring Cloud Gateway 发布之前,Spring Cloud 使用的是由 Netflix 开源的 Zuul 1 作为网关组件。Zuul 1 是基于传统的 Servlet API 开发的,使用了阻塞的网络模型,每个请求需要分配专门的线程处理,所以资源开销比较大,在高并发的情况下需要创建大量的线程来处理请求,线程数目会成为系统的瓶颈。

作为取代 Spring Cloud Zuul 的组件,Spring Cloud Gateway 网络层使用了基于非阻塞的 Netty,解决了线程数瓶颈从而提升了系统性能

微服务网关的功能

2、Spring Cloud Gateway 基本原理

Spring Cloud Gateway 使用了 Spring WebFlux 非阻塞网络框架,网络层默认使用了高性能非阻塞的 Netty Server,解决了 Spring Cloud Zuul 因为阻塞的线程模型带来的性能下降的问题。

Spring WebFlux 的内容介绍,参考作者文章: Spring WebFlux 和 Spring MVC 对比分析

Gateway 本身是一个 Spring Boot 应用,它处理请求是逻辑是根据配置的路由对请求进行预处理和转发。Gateway 有几个比较核心的概念:

Gateway 在启动时会创建 Netty Server,由它接收来自 Client 的请求。收到请求后根据路由的匹配条件找到第一个满足条件的路由,然后请求在被该路由配置的过滤器处理后由 Netty Client 转到目标服务。服务返回响应后会再次被过滤器处理,最后返回给 Client。

Gateway 路由配置

Spring Cloud Gateway 本身提供了很多 Predicate 和 Filter 的实现,一些基本的功能可以通过这些现成的 Predicate 和 Filter 配置实现。这些 Gateway 本身提供的 Predicate 和 Filter 在官方文档上有详细的介绍,这里给一个大致的例子:

spring:
  cloud:
    gateway:
      routes:
      - id: test_route
        uri: lb://service-A
        predicates:
         - Path=/hello
        filters:
        - SetRequestHeader=X-Request-Red, Blue

路由是 Gateway 的核心构件,不同的路由根据匹配条件可以处理不同类型的请求,并转发到对应的目标服务。一个路由由以下几个属性组成:

Gateway 请求路由原理

Gateway 使用了 Spring WebFlux 框架,该框架处理请求的入口在类 DispatcherHandler 。它会根据提供的 HandlerMapping 来获取处理请求的 Handler 方法。Gateway 应用对 HandlerMapping 的实现是 RoutePredicateHandlerMapping

  1. 进来的请求由 DispatcherHandler 处理。
  2. DispatcherHandler 根据 RoutePredicateHandlerMapping 获取 Handler 方法。
  3. RoutePredicateHandlerMapping 依赖 RouteLocator 获取所有路由配置并根据匹配条件打到请求匹配的路由。
  4. RoutePredicateHandlerMapping 把请求交给 FilteringWebHandler 处理。
  5. FilteringWebHandler 从请求匹配的路由获取对应的路由 Filter,并和全局 Filter 合并构造 GatewayFilterChain,请求最终由 GatewayFilterChain 里的 Filter 按顺序处理。

Spring Cloud Gateway 上下文(ServerWebExchange)

SpringCloud Gateway 的上下文是 ServerWebExchange,请求的信息都存储在 ServerWebExchange 中,在网关上的后续操作都是基于上下文操作的,在 http 请求到达网关之后,网关入口是ReactorHttpHandlerAdapter#apply 方法,去获取请求的 request 和 response,构建当次请求的上下文供后续 filter 使用:

public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {

        @Override
    public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
        NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
        try {
                // 获取请求的Request,构建ReactorServerHttpRequest
            ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
            // 构建ServerHttpResponse
            ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

            if (request.getMethod() == HttpMethod.HEAD) {
                response = new HttpHeadResponseDecorator(response);
            }
            // 交给HttpWebHandlerAdapter构建上下文ServerWebExchange
            return this.httpHandler.handle(request, response)
                    .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
                    .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
        }
        catch (URISyntaxException ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to get request URI: " + ex.getMessage());
            }
            reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
            return Mono.empty();
        }
    }
}

构建完 request 和 response 后,交给 HttpWebHandlerAdapter 构建上下文 ServerWebExchange:

public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
    public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {

        if (this.forwardedHeaderTransformer != null) {
            request = this.forwardedHeaderTransformer.apply(request);
        }
        // 构建请求的上下文
        ServerWebExchange exchange = createExchange(request, response);

        LogFormatUtils.traceDebug(logger, traceOn ->
                exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
                        (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

        return getDelegate().handle(exchange)
                .doOnSuccess(aVoid -> logResponse(exchange))
                .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
                .then(Mono.defer(response::setComplete));
    }
}

Spring Cloud Gateway 读取路由(RouteDefinition)

我们在配置文件中配置的一个路由规则,对应到 Java 类就是 GatewayProperties,Spring Boot 会将配置文件映射为 Java 类,例如上文的配置:

spring:
  cloud:
    gateway:
      routes:
      - id: test_route
        uri: lb://service-A
        predicates:
         - Path=/hello
        filters:
        - SetRequestHeader=X-Request-Red, Blue

路由信息映射到 GatewayProperties 后如何获取其中的 RouteDefinition?

答案是通过 RouteDefinitionLocator。

public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {

    private final GatewayProperties properties;
    // 构造函数设置properties
    public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
        this.properties = properties;
    }
    // 从properties中读取RouteDefinition
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return Flux.fromIterable(this.properties.getRoutes());
    }
}

当然我们获取路由信息的地方不止 properties 一种,还可以从 内存,缓存,甚至注册中心等。CompositeRouteDefinitionLocator 利用委派器模式允许我们组合读取路由信息。

public class CompositeRouteDefinitionLocator implements RouteDefinitionLocator {

    private final Flux<RouteDefinitionLocator> delegates;
    // 将 RouteDefinitionLocator 组合
    public CompositeRouteDefinitionLocator(Flux<RouteDefinitionLocator> delegates) {
        this.delegates = delegates;
    }
    // 委托给 RouteDefinitionRepository 执行读取
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        return this.delegates.flatMap(RouteDefinitionLocator::getRouteDefinitions);
    }
}

Spring Cloud Gateway 的 GlobalFilter

GlobalFilter 是所有被 Gateway 拦截的 http 请求都要做的处理;GatewayFilter 是根据路由配置匹配predicate 的 http 请求才会做的处理。

全局拦截器,是所有被拦截到的 http 请求都要去做的处理;例如拿到一个 http 请求后,我们的目的是转发到下游服务,请求结果并返回,那么所有被拦截到的 http 请求都需要做下列几件事:

  1. 按照 predicate 把符合规则的 url 转换为真正要去请求的 url;
  2. 调用真正的下游服务(基于 netty 实现的 http 调用,具体代码在 NettyRoutingFilter 类中);
  3. 得到 response,返回给调用方。
public interface GlobalFilter {
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

接口中只有一个 filter 方法,实现类实现该接口后在 filter 中去做具体拦截逻辑,这些 Filter 都实现了 GlobalFilter 接口:

  • AdaptCachedBodyGlobalFilter:优先级最高的 Filter,请求到 gateway 后,将上下文ServerWebExchange 中已有的缓存删除 请求信息,将此次的请求信息缓存到上下文中。

  • ForwardPathFilter:如果该请求还未被路由或 URI对象的属性不是 forward,则将该请求对应配置的 Route 信息中 uri 的 path 设置到上下文 ServerWebExchange 中。

  • RouteToRequestUrlFilter:将此次请求的 uri 和配置的 Route 规则做 merged 处理,拿到真正代理的下游服务的地址,将得到的 url 放到上下文中,key 为 GATEWAY_REQUEST_URL_ATTR

  • LoadBalancerClientFilter:网关提供了负载均衡的 Filter,具体负载规则可以自己实现。

  • NoLoadBalancerClientFilter:没有负载均衡的拦截器。

  • NettyRoutingFilter:网关的 http 是基于 netty 实现的,若此次请求 scheme 是 http 或 https 则使用基于 netty 的 httpClient 执行调用,将返回结果写入上下文中。

  • NettyWriteResponseFilter:基于 Web Flux,若上下文中存在 CLIENT_RESPONSE_CONN_ATTR,将响应数据返回。

  • WebClientHttpRoutingFilter:作用同 NettyRoutingFilter,方式同 LoadBalancerClientFilter。

  • WebsocketRoutingFilter:路由 WebSocket 请求,校验逻辑在WebsocketRoutingFilter#changeSchemeIfIsWebSocketUpgrade 中。

  • WebClientWriteResponseFilter:作用同 NettyWriteResponseFilter。

  • ForwardRoutingFilter:设置此次请求已被路由。

  • GatewayMetricsFilter:统计网关的性能指标。

Spring Cloud Gateway 的 GatewayFilter

GatewayFilter 是面向开发人员的,因需适配,当我们需要给符合 predicate 的 url 做一些处理时通过配置就可添加,例如,我们想给 path 匹配上 /test/** 的 url 添加 header,通过下列配置就可添加,这类配置是根据业务需求进行的特殊配置。

public interface GatewayFilter extends ShortcutConfigurable {

    /**
     * Name key.
     */
    String NAME_KEY = "name";

    /**
     * Value key.
     */
    String VALUE_KEY = "value";

    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

接口定义中多了 NAME_KEYVALUE_KEY,原因是 GatewayFilter 是面向开发人员的,例如我们需要配置给 path符合 /test/** 的请求添加 header 时,header 是 key-value 形式,这时候就用到了:

public class AddRequestHeaderGatewayFilterFactory extends AbstractNameValueGatewayFilterFactory {

    @Override
    public GatewayFilter apply(NameValueConfig config) {
        return (exchange, chain) -> {
        // 将要添加的key-value添加到上下文的header中
                    ServerHttpRequest request = exchange.getRequest().mutate()
                                    .header(config.getName(), config.getValue()).build();

                    return chain.filter(exchange.mutate().request(request).build());
        };
    }
}

GlobalFilter 和 GatewayFilter 整合应用

每个 Filter 中都有一个 Order 属性,在执行时是在 FilteringWebHandler#handle方法 中对 GlobalFilter 和 GatewayFilter 进行的整合和排序,具体执行在 FilteringWebHandler#filter方法

    /**
    * 整合Filter 
    */
    public Mono<Void> handle(ServerWebExchange exchange) {
            // 根据Route信息取出配置的GatewayFilter集合
            Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
            List<GatewayFilter> gatewayFilters = route.getFilters();
            // 取出globalFilters
            List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
            // 将GatewayFilter添加到combined
            combined.addAll(gatewayFilters);
            // combined根据Order排优先级
            AnnotationAwareOrderComparator.sort(combined);

            if (logger.isDebugEnabled()) {
                    logger.debug("Sorted gatewayFilterFactories: " + combined);
            }

            return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

    /**
    * 执行Filter 
    */
    public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                    if (this.index < filters.size()) {
                            GatewayFilter filter = filters.get(this.index);
                            DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
                                            this.index + 1);
                            return filter.filter(exchange, chain);
                    }
                    else {
                            return Mono.empty(); // complete
                    }
            });
    }

GlobalFilter 和 GatewayFilter 自定义 Filter

public class GlobalTestFilter implements GlobalFilter, Ordered {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        if("符合业务逻辑,处理完业务逻辑,继续执行下一个filter"){
            return chain.filter(exchange);
        }
        //不符合业务逻辑,直接返回
        return "按照不符合业务逻辑处理";
    }
}
public class TestGatewayFilterFactory extends AbstractGatewayFilterFactory<TestGatewayFilterFactory.Config> {

    public TestGatewayFilterFactory() {
        super(TestGatewayFilterFactory.Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            if("符合条件,处理业务逻辑,继续执行下一个Filter"){
                return chain.filter(exchange);
            }
             // 不符合条件,直接返回
            return "false";
        };
    }

    public static class Config {

        private String businessAttributes;

        public String getBusinessAttributes() {
            return businessAttributes;
        }

        public void setBusinessAttributes(String businessAttributes) {
            this.businessAttributes = businessAttributes;
        }
    }
}

3、Spring Cloud Gateway 工作过程

网关启动阶段

  1. Yaml 文件和 GatewayProperties 文件映射,映射处理源码在 JavaBeanBinder.BeanProperty#getValue –> CollectionBinder#merge —> Binder#bindBean;
  2. 加载 Locator Bean,为后续读取 RouteDefinition 做准备【GatewayAutoConfiguration】;
public class GatewayAutoConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(
            GatewayProperties properties) {
        return new PropertiesRouteDefinitionLocator(properties);
    }

    @Bean
    @ConditionalOnMissingBean(RouteDefinitionRepository.class)
    public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
        return new InMemoryRouteDefinitionRepository();
    }

    @Bean
    @Primary
    public RouteDefinitionLocator routeDefinitionLocator(
            List<RouteDefinitionLocator> routeDefinitionLocators) {
        return new CompositeRouteDefinitionLocator(
                Flux.fromIterable(routeDefinitionLocators));
    }

    @Bean
    @Primary
    public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
        return new CachingRouteLocator(
                new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
    }
}
  1. 初始化GlobalFilters【FilteringWebHandler】;
public class GatewayAutoConfiguration {
    @Bean
    public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
        return new FilteringWebHandler(globalFilters);
    }
}
public class FilteringWebHandler implements WebHandler {

    private final List<GatewayFilter> globalFilters;

    // 构造函数中设置globalFiltersglobalFilters
    public FilteringWebHandler(List<GlobalFilter> globalFilters) {
        this.globalFilters = loadFilters(globalFilters);
    }
    // 设置globalFilters
    private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
        return filters.stream().map(filter -> {
            GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
            if (filter instanceof Ordered) {
                int order = ((Ordered) filter).getOrder();
                return new OrderedGatewayFilter(gatewayFilter, order);
            }
            return gatewayFilter;
        }).collect(Collectors.toList());
    }
}
  1. 初始化 predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration –> RouteDefinitionRouteLocator】;
public class RouteDefinitionRouteLocator
        implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {

    // 构造函数中初始化
    public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
            List<RoutePredicateFactory> predicates,
            List<GatewayFilterFactory> gatewayFilterFactories,
            GatewayProperties gatewayProperties, ConversionService conversionService) {
        this.routeDefinitionLocator = routeDefinitionLocator;
        this.conversionService = conversionService;
        initFactories(predicates);
        gatewayFilterFactories.forEach(
                factory -> this.gatewayFilterFactories.put(factory.name(), factory));
        this.gatewayProperties = gatewayProperties;
    }

    // 设置predicate工厂
    private void initFactories(List<RoutePredicateFactory> predicates) {
        predicates.forEach(factory -> {
            String key = factory.name();
            if (this.predicates.containsKey(key)) {
                this.logger.warn("A RoutePredicateFactory named " + key
                        + " already exists, class: " + this.predicates.get(key)
                        + ". It will be overwritten.");
            }
            this.predicates.put(key, factory);
            if (logger.isInfoEnabled()) {
                logger.info("Loaded RoutePredicateFactory [" + key + "]");
            }
        });
    }

    public Flux<Route> getRoutes() {
        // 从RouteDefinitions转换为Route,转换过程在convertToRoute方法中实现
        return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("RouteDefinition matched: " + route.getId());
                    }
                    return route;
                });
    }

    // RouteDefinition到Route的转换
    private Route convertToRoute(RouteDefinition routeDefinition) {
        // 从routeDefinition获取predicate
        AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
        // 从routeDefinition获取gatewayFilters
        List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
        // 构造Route
        return Route.async(routeDefinition).asyncPredicate(predicate)
                .replaceFilters(gatewayFilters).build();
    }

    // 获取GatewayFilters
    private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
        List<GatewayFilter> filters = new ArrayList<>();
        // 如果默认filter不为空,则去加载
        if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
            filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
                    this.gatewayProperties.getDefaultFilters()));
        }
        // 如果Filter不为空,则
        if (!routeDefinition.getFilters().isEmpty()) {
            filters.addAll(loadGatewayFilters(routeDefinition.getId(),
                    routeDefinition.getFilters()));
        }

        AnnotationAwareOrderComparator.sort(filters);
        return filters;
    }

    @SuppressWarnings("unchecked")
    private List<GatewayFilter> loadGatewayFilters(String id,
            List<FilterDefinition> filterDefinitions) {
        List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> {
            // 从gatewayFilterFactories中根据key获取factory
            GatewayFilterFactory factory = this.gatewayFilterFactories
                    .get(definition.getName());
            if (factory == null) {
                throw new IllegalArgumentException(
                        "Unable to find GatewayFilterFactory with name "
                                + definition.getName());
            }
            // 获取definition设置的Filter值
            Map<String, String> args = definition.getArgs();
            if (logger.isDebugEnabled()) {
                logger.debug("RouteDefinition " + id + " applying filter " + args + " to "
                        + definition.getName());
            }

            Map<String, Object> properties = factory.shortcutType().normalize(args,
                    factory, this.parser, this.beanFactory);
            // 每一个工厂中都有一个静态内部类Config,目的是存储我们设置的Filter值
            Object configuration = factory.newConfig();
            // 将后几个参数的信息绑定到configuration
            ConfigurationUtils.bind(configuration, properties,
                    factory.shortcutFieldPrefix(), definition.getName(), validator,
                    conversionService);
            // 获得GatewayFilter
            GatewayFilter gatewayFilter = factory.apply(configuration);
            if (this.publisher != null) {
                this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
            }
            return gatewayFilter;
        }).collect(Collectors.toList());

        ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
        for (int i = 0; i < filters.size(); i++) {
            GatewayFilter gatewayFilter = filters.get(i);
            if (gatewayFilter instanceof Ordered) {
                ordered.add(gatewayFilter);
            }
            else {
                ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
            }
        }

        return ordered;
    }
}

请求处理阶段

  1. ReactorHttpHandlerAdapter#apply 方法是请求到网关执行的入口;
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {

        public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
        NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
        try {
            // 获取请求的request和response
            ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
            ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

            if (request.getMethod() == HttpMethod.HEAD) {
                response = new HttpHeadResponseDecorator(response);
            }
            // 给到HttpWebHandlerAdapter执行构建
            return this.httpHandler.handle(request, response)
                    .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
                    .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
        }
        catch (URISyntaxException ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to get request URI: " + ex.getMessage());
            }
            reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
            return Mono.empty();
        }
    }
}
  1. HttpWebHandlerAdapter#handle 构建网关上下文 ServerWebExchange;
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
    public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {

        if (this.forwardedHeaderTransformer != null) {
            request = this.forwardedHeaderTransformer.apply(request);
        }
        // 根据请求的request、response构建网关上下文
        ServerWebExchange exchange = createExchange(request, response);

        LogFormatUtils.traceDebug(logger, traceOn ->
                exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
                        (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

        return getDelegate().handle(exchange)
                .doOnSuccess(aVoid -> logResponse(exchange))
                .onErrorResume(ex -> handleUnresolvedError(exchange, ex))
                .then(Mono.defer(response::setComplete));
    }
}
  1. DispatcherHandler 用于 Http 请求处理器/控制器的中央分发处理器,把请求分发给已经注册的处理程序处理,DispatcherHandler 遍历 Mapping 获取对应的 handler,网关一共有 6 个 handlerMapping【此处会找到 RoutePredicateHandlerMapping,通过 RoutePredicateHandlerMapping 获取 FilteringWebHandler,通过 FilteringWebHandler 获取】;
public class DispatcherHandler implements WebHandler, ApplicationContextAware {

    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        // 遍历mapping获取handler
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }
}
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

    private final FilteringWebHandler webHandler;

    private final RouteLocator routeLocator;

    private final Integer managementPort;

    private final ManagementPortType managementPortType;

    // 网关启动时进行了初始化
    public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
            RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
            Environment environment) {
        this.webHandler = webHandler;
        this.routeLocator = routeLocator;

        this.managementPort = getPortProperty(environment, "management.server.");
        this.managementPortType = getManagementPortType(environment);
        setOrder(1);
        setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
    }

    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on management port if set and different than server port
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    // 返回FilteringWebHandler
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for ["
                                + getExchangeDesc(exchange) + "]");
                    }
                })));
    }
}
  1. RoutePredicateHandlerMapping#lookupRoute 匹配路由,根据 routeLocator 获取我们在配置我文件中配置的 Route,和当前请求的路由做匹配;
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {

        // routeLocator获取我们在配置我文件中配置的Route
        return this.routeLocator.getRoutes()
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    // 当前请求的路由做匹配
                    return r.getPredicate().apply(exchange);
                })
                        .doOnError(e -> logger.error(
                                "Error applying predicate for route: " + route.getId(),
                                e))
                        .onErrorResume(e -> Mono.empty()))
                .next()
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });
    }
}
  1. FilteringWebHandler 创建过滤器链,执行过滤器;
public class FilteringWebHandler implements WebHandler {

    // 创建过滤器链
    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }

        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

    private static class DefaultGatewayFilterChain implements GatewayFilterChain {
        // 调用过滤器
        public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
                            this.index + 1);
                    // 执行调用
                    return filter.filter(exchange, chain);
                }
                else {
                    return Mono.empty(); // complete
                }
            });
        }
    }
}

作者:白菜说技术
链接:https://juejin.cn/post/7185431048774221861
来源:稀土掘金

上一篇下一篇

猜你喜欢

热点阅读