Spring-cloundSpring Cloud

Spring Cloud Gateway--请求流程解析

2020-12-24  本文已影响0人  NealLemon

前言

其实一开始想自己总结一下Spring Cloud Gateway的路由流程,但是在看源码的过程中,发现很多类都是提前在Netty启动时已经配置或者是已经声明好的,如果直接开始总结Spring Cloud Gateway 的路由流程,不但有点突兀,等过段时间我自己回来看的时候,可能也记不得了,所以先总结了总结了一下SpringBoot内置Netty的启动流程,这样调用的类和其中的方法,我们就可以知道到底是在何时声明,从底层一步一步理解Spring Cloud Gateway路由流程。

简单介绍Spring Cloud Gateway

根据官方的文档 的我们大体可以了解到Spring Cloud Gateway 大体上由三部分组成

官方的工方式如图

gateway大体流程图.png

上图大体上可以理解为:

客户端向Spring Cloud Gateway发出请求。 如果Gateway Handler Mapping 通过断言的集合确定请求与路由匹配,则将其发送到Gateway Web Handler。 Gateway Web Handler 通过确定的路由中所配置的过滤器集合链式调用过滤器(也就是所谓的责任链模式)。 Filter由虚线分隔的原因是, Filter可以在发送代理请求之前和之后运行逻辑。处理的逻辑是 在处理请求时 排在前面的过滤器先执行,而处理返回相应的时候,排在后面的过滤器先执行。

Spring Cloud Gateway 详细工作流程

首先先来一张大体流程调用图,自己用软件画的,卖相不是很好看,但是大体流程都有,大家可以跟着图去debugger一下,我觉得就可以搞明白整体流程了。

gateway流程图.jpg

首先客户端发送HTTP请求进来后,先经过的是 ReactorHttpHandlerAdapter#apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) 方法,然后通过在Netty启动时配置好的HttpHandler 进行HTTP请求处理,这里的HttpHandler 指的是 HttpWebHandlerAdapter 。这里涉及到的几个类 都是在启动时就已经注入了,可以看一下我之前的总结 SpringBoot-内置Netty启动(一) 。 我们可以看一下源码。

    @Override
    public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
        //获取请求处理的内存空间
        NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
        try {
            ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
            ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

            if (request.getMethod() == HttpMethod.HEAD) {
                response = new HttpHeadResponseDecorator(response);
            }
            //HttpWebHandlerAdapter 去处理请求 
            return this.httpHandler.handle(request, response)
                    .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
                    .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
        }
        catch (URISyntaxException ex) {
            if (logger.isDebugEnabled()) {
                logger.debug("Failed to get request URI: " + ex.getMessage());
            }
            reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
            return Mono.empty();
        }
    }

这里需要注意的是 HttpWebHandlerAdapter其实是个代理类,他层层代理 最终指向的是 DispatcherHandler 如图所示

DispatcherHandler请求处理.png

感兴趣的同学可以按照上面给出的请求流程图去自己debugger每一个代理类,需要注意的是 这里的FilteringWebHandlerorg.springframework.web.server.handler包中的 而不是 Spring Cloud Gateway中的handler。

接下来我们来主要看一下 DispatcherHandler#handle(ServerWebExchange exchange) 这个方法,也是整个WebFlux请求处理的核心方法。

    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return 
            //创建Flux流操作
            Flux.fromIterable(this.handlerMappings)  
            //流式执行 getHandler方法
                .concatMap(mapping -> mapping.getHandler(exchange))
            //将流操作中返回的第一个项发送到新的Mono中
                .next()
                .switchIfEmpty(createNotFoundError())
            //从获取到的handler中执行 invokeHandler(exchange, handler)
                .flatMap(handler -> invokeHandler(exchange, handler))
            //从获取到的handler获取到result执行  handleResult(exchange, result)
                .flatMap(result -> handleResult(exchange, result));
    }

我们来看一下 handlerMappings 集合中有哪几个 HandlerMapping的实现类

Mappings.png

我们这里主要是看 RoutePredicateHandlerMapping 首先调用了父类 AbstractHandlerMapping#getHandler(ServerWebExchange exchange),在父类方法中调用了子类的getHandlerInternal(ServerWebExchange exchange) 方法后做了跨域等逻辑之后返回得到的Handler对象。

//AbstractHandlerMapping 
    @Override
    public Mono<Object> getHandler(ServerWebExchange exchange) {
        return getHandlerInternal(exchange).map(handler -> {
            if (logger.isDebugEnabled()) {
                logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
            }
            ServerHttpRequest request = exchange.getRequest();
            if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
                CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
                CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
                config = (config != null ? config.combine(handlerConfig) : handlerConfig);
                if (config != null) {
                    config.validateAllowCredentials();
                }
                if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                    return REQUEST_HANDLED_HANDLER;
                }
            }
            return handler;
        });
    }

//RequestMappingHandlerMapping
@Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
        //查找对应的route配置 
        return lookupRoute(exchange)
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }
                    //如果找到则将Route 放到exchange的属性中
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    //返回 FilteringWebHandler
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                   //如果没有则不返回,即最后不会走 该类中的webHandler 去处理请求
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

    //查找路由的方法
    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes()
            //遍历定义好的路由
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    //遍历路由中定义好的断言集合
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    //断言校验,返回正确的路由
                    return r.getPredicate().apply(exchange);
                })
                        .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
                        .onErrorResume(e -> Mono.empty()))
                .next()
                .map(route -> {
                    //校验路由并返回
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });
    }

RoutePredicateHandlerMapping的处理逻辑大致上如上源码,重要的地方已经给出注释。获取到 FilteringWebHandler后,在执行 DispatcherHandler#invokeHandler

FilteringHandle.png

我们接着看一下 FilteringWebHandler#handle


    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        //获取到已经匹配好的路由
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        //获取路由中配置的过滤器
        List<GatewayFilter> gatewayFilters = route.getFilters();
        //获取全局的过滤器
        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        //合并并排序
        combined.addAll(gatewayFilters);
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }
        //执行过滤器链
        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }


        //责任链模式调用 过滤器
        @Override
        public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                }
                else {
                    return Mono.empty(); // complete
                }
            });
        }

Spring Cloud Gateway 详细的执行流程总结的差不多了,在上面的流程图中还有很多细节我没有说明,那是因为不是核心的流程代码,但是不等于不重要,如果有心的同学可以一起debug 再深入的了解你会发现在 路由断言的配置当中也用到了 非org.springframework.cloud.gateway包中的Filter。这种设计虽然不算巧妙,但是确实可以节省一些不必要的消耗。

上一篇下一篇

猜你喜欢

热点阅读