Gateway源码剖析

2023-03-17  本文已影响0人  王侦

在微服务架构中,各个服务都是独立运行来完成某个特定领域的功能,服务间通过 REST API 或 RPC 进行通信。当前端一个请求发生时,比如查看商品详情,客户端可能要调用商品服务、库存服务、评价服务等多个微服务,如果客户端直接对接各个微服务,在复杂的调用过程中存在的问题:

网关的出现可以解决这些问题,网关是微服务架构体系对外提供能力的统一接口,本质上是对请求进行转发、前置和后置的过滤:

常见的 API 网关实现方案:OpenResty(Nginx+lua)、Zuul(Netflix)、Gateway(Spring)、Kong。

Spring Cloud 已经有了 Zuul,Spring 为什么重新研发了 Gateway?

几个重要组成部分:

0.自动配置类

GatewayAutoConfiguration

GatewayReactiveLoadBalancerClientAutoConfiguration

GatewayLoadBalancerClientAutoConfiguration

1.NettyWebServer

ReactiveWebServerApplicationContext#onRefresh

1.1 配置

NettyReactiveWebServerFactory#createHttpServer

    private HttpServer createHttpServer() {
        HttpServer server = HttpServer.create();
        if (this.resourceFactory != null) {
            LoopResources resources = this.resourceFactory.getLoopResources();
            Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
            server = server
                    .tcpConfiguration((tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress));
        }
        else {
            server = server.tcpConfiguration((tcpServer) -> tcpServer.bindAddress(this::getListenAddress));
        }
        if (getSsl() != null && getSsl().isEnabled()) {
            SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(), getHttp2(),
                    getSslStoreProvider());
            server = sslServerCustomizer.apply(server);
        }
        if (getCompression() != null && getCompression().getEnabled()) {
            CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression());
            server = compressionCustomizer.apply(server);
        }
        server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
        return applyCustomizers(server);
    }

这里采用了层层嵌套的方式

1.2 创建NettyWebServer

NettyReactiveWebServerFactory#getWebServer

    public WebServer getWebServer(HttpHandler httpHandler) {
        HttpServer httpServer = createHttpServer();
        ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        NettyWebServer webServer = new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout, getShutdown());
        webServer.setRouteProviders(this.routeProviders);
        return webServer;
    }

这里对httpServer又加了一层配置:

1.3 启动

AbstractApplicationContext#finishRefresh

WebServerManager#start

    void start() {
        this.handler.initializeHandler();
        this.webServer.start();
        this.applicationContext
                .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
    }

1.3.1 handler

来看看handler:

1.3.2 启动

NettyWebServer#startHttpServer又封装了一层

首先来看看tcpConfiguration()

这个执行的逻辑是从最里层向外执行

最后返回的是:


然后来看看bind()

BootstrapInitializerHandler包含的属性BootstrapInitializerHandler(pipeline, opsFactory, childListener):


1.4 pipeline

DefaultChannelPipeline

2.核心处理流程

网关的核心是 Filter 以及 Filter Chain,客户端向 Spring Cloud Gateway 发出请求,然后在 Gateway Handler Mapping 中找到与请求相匹配的路由,将其发送到 Gateway Web Handler。Handler 再通过指定的过滤器链来将请求发送到我们实际的服务执行业务逻辑,然后返回。过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(pre)或之后(post)执行业务逻辑。

2.1 DispatcherHandler#handle

ChannelOperationsHandler#channelRead

    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }

2.2 mapping#getHandler

包含四个HandlerMapping

核心是看RoutePredicateHandlerMapping

2.2.1 lookupRoute

    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes().concatMap((route) -> {
            return Mono.just(route).filterWhen((r) -> {
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                return (Publisher)r.getPredicate().apply(exchange);
            }).doOnError((e) -> {
                this.logger.error("Error applying predicate for route: " + route.getId(), e);
            }).onErrorResume((e) -> {
                return Mono.empty();
            });
        }).next().map((route) -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Route matched: " + route.getId());
            }

            this.validateRoute(route, exchange);
            return route;
        });
    }

RouteDefinitionRouteLocator#getRoutes

Mono.just(route).filterWhen((r)

2.2.2绑定路由信息到请求上下文exchange

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r)

2.3 invokeHandler(exchange, handler)

DispatcherHandler#invokeHandler

    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (this.handlerAdapters != null) {
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                if (handlerAdapter.supports(handler)) {
                    return handlerAdapter.handle(exchange, handler);
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }

适配的是SimpleHandlerAdapter

FilteringWebHandler#handle

    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();
        List<GatewayFilter> combined = new ArrayList(this.globalFilters);
        combined.addAll(gatewayFilters);
        AnnotationAwareOrderComparator.sort(combined);
        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }

        return (new DefaultGatewayFilterChain(combined)).filter(exchange);
    }

排序后的过滤器:


GlobalFilter -> GateFilter,使用适配器模式

全局过滤器链

2.4 handleResult

调试的时候,这个没有走到。

2.5 post过滤器

问题:Gateway微服务返回响应后走得过滤器链是啥?

NettyWriteResponseFilter#filter

    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
        // until the NettyRoutingFilter is run
        // @formatter:off
        return chain.filter(exchange)
                .doOnError(throwable -> cleanup(exchange))
                .then(Mono.defer(() -> {
                    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

                    if (connection == null) {
                        return Mono.empty();
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("NettyWriteResponseFilter start inbound: "
                                + connection.channel().id().asShortText() + ", outbound: "
                                + exchange.getLogPrefix());
                    }
                    ServerHttpResponse response = exchange.getResponse();

                    // TODO: needed?
                    final Flux<DataBuffer> body = connection
                            .inbound()
                            .receive()
                            .retain()
                            .map(byteBuf -> wrap(byteBuf, response));

                    MediaType contentType = null;
                    try {
                        contentType = response.getHeaders().getContentType();
                    }
                    catch (Exception e) {
                        if (log.isTraceEnabled()) {
                            log.trace("invalid media type", e);
                        }
                    }
                    return (isStreamingMediaType(contentType)
                            ? response.writeAndFlushWith(body.map(Flux::just))
                            : response.writeWith(body));
                })).doOnCancel(() -> cleanup(exchange));
        // @formatter:on
    }

参考

上一篇 下一篇

猜你喜欢

热点阅读