Spring Cloud 相关文章

Spring Cloud Gateway 默认的filter功能

2018-07-28  本文已影响6509人  giafei

有效性

Spring Cloud Gateway 2.0.0.RELEASE

调试方法

新建一个GlobalFilter,在filter中加断点即可调试filter,通过chain参数可以查看其它的filter及执行顺序(order)

filters(按执行顺序)

1. AdaptCachedBodyGlobalFilter

核心代码

public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE + 1000;
}

public static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    Flux<DataBuffer> body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_KEY, null);
    if (body != null) {
        ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public Flux<DataBuffer> getBody() {
                return body;
            }
        };
        return chain.filter(exchange.mutate().request(decorator).build());
    }

    return chain.filter(exchange);
}

提供替换request 的 body的能力

2.NettyWriteResponseFilter

核心代码

public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
public int getOrder() {
    return WRITE_RESPONSE_FILTER_ORDER;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    return chain.filter(exchange).then(Mono.defer(() -> {
        //见 后文的 NettyRoutingFilter
        HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);

        ServerHttpResponse response = exchange.getResponse();
        NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
        
        final Flux<NettyDataBuffer> body = clientResponse.receive()
                .map(factory::wrap);

        MediaType contentType = response.getHeaders().getContentType();
        return (isStreamingMediaType(contentType) ?
                response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
    }));
}

具体的将被代理的服务的内容返回的类,文档

3.ForwardPathFilter

核心代码

public int getOrder() {
    return 0;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    URI routeUri = route.getUri();
    String scheme = routeUri.getScheme();
    if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
        return chain.filter(exchange);
    }
    exchange = exchange.mutate().request(
            exchange.getRequest().mutate().path(routeUri.getPath()).build())
            .build();
    return chain.filter(exchange);
}

forward协议的url替换类

4.在Route中配置的各种GatewayFilter

核心代码

/**
 * RouteDefinitionRouteLocator#loadGatewayFilters GatewayFilter的order
 */
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
    GatewayFilter gatewayFilter = filters.get(i);
    if (gatewayFilter instanceof Ordered) {
        ordered.add(gatewayFilter);
    }
    else {
        ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
    }
}

return ordered;

根据配置不同实现具体的功能,详见文档

5.RouteToRequestUrlFilter

核心代码

public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;
public int getOrder() {
    return ROUTE_TO_URL_FILTER_ORDER;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    if (route == null) {
        return chain.filter(exchange);
    }
    URI uri = exchange.getRequest().getURI();
    boolean encoded = containsEncodedParts(uri);
    URI routeUri = route.getUri();
    
    //匹配 http:http://locahost:80/a/b/c?q=1,并把第一个 http: 去掉
    if (hasAnotherScheme(routeUri)) {
        // uri格式 [scheme:]scheme-specific-part[#fragment]
        exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
        routeUri = URI.create(routeUri.getSchemeSpecificPart());
    }

    URI requestUrl = UriComponentsBuilder.fromUri(uri)
            .uri(routeUri)
            .build(encoded)
            .toUri();
    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
    return chain.filter(exchange);
}

private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";
static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);
static boolean hasAnotherScheme(URI uri) {
    return schemePattern.matcher(uri.getSchemeSpecificPart()).matches() && uri.getHost() == null
            && uri.getRawPath() == null;
}

路由功能的具体执行类,文档

6.LoadBalancerClientFilter(如果启用了eureka)

核心代码

public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
public int getOrder() {
    return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
    String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
    if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
        return chain.filter(exchange);
    }

    //一大波转换操作
    addOriginalRequestUrl(exchange, url);
    final ServiceInstance instance = loadBalancer.choose(url.getHost());

    if (instance == null) {
        throw new NotFoundException("Unable to find instance for " + url.getHost());
    }

    URI uri = exchange.getRequest().getURI();
    String overrideScheme = null;
    if (schemePrefix != null) {
        overrideScheme = url.getScheme();
    }

    URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
    
    //转换后的url填入 GATEWAY_REQUEST_URL_ATTR 属性
    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
    return chain.filter(exchange);
}

lb协议的路由功能,文档

7.WebsocketRoutingFilter

核心代码

public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE - 1;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    //upgrade头 见https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism
    //或见 https://httpwg.org/specs/rfc7230.html#header.upgrade
    changeSchemeIfIsWebSocketUpgrade(exchange);

    //跳过一大波参数检查与参数获取

    return this.webSocketService.handleRequest(exchange,
            new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
                    filtered, protocols));
}

/**
 * ProxyWebSocketHandler#handle 桥接两个webSocket
 */
public Mono<Void> handle(WebSocketSession session) { //session为客户端
    return client.execute(url, this.headers, new WebSocketHandler() {
        @Override
        public Mono<Void> handle(WebSocketSession proxySession) {   //proxySession为被代理的WebSocket
            Mono<Void> proxySessionSend = proxySession
                    .send(session.receive().doOnNext(WebSocketMessage::retain));
            Mono<Void> serverSessionSend = session
                    .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
            return Mono.zip(proxySessionSend, serverSessionSend).then();
        }
        
        //省略其它方法
    });
} 

WebSocket的代理功能,文档

8.NettyRoutingFilter

核心代码

public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        //省略一大波参数获取和参数校验
        final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
        final String url = requestUrl.toString();

        return this.httpClient.request(method, url, req -> {
            //省略http数据发送代码
        }).doOnNext(res -> {
            ServerHttpResponse response = exchange.getResponse();
            
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
            //注意,如果ContentType为null会 NPE,特别是301或302跳转
            exchange.getAttributes().put("original_response_content_type", headers.getContentType());

            //省略其它http解析代码
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);    //与前面的 NettyWriteResponseFilter 对应
        }).then(chain.filter(exchange));
    }
} 

http协议的代理功能,文档

9.ForwardRoutingFilter

核心代码

public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

    String scheme = requestUrl.getScheme();
    if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
        return chain.filter(exchange);
    }
    setAlreadyRouted(exchange);

    if (log.isTraceEnabled()) {
        log.trace("Forwarding to URI: "+requestUrl);
    }

    return this.dispatcherHandler.handle(exchange);
}

将未处理的forward协议的请求交由spring来处理,文档

其中 NettyRoutingFilterNettyWriteResponseFilter 内置有 WebClientHttpRoutingFilterWebClientWriteResponseFilter 作为备用替换版本。

上一篇下一篇

猜你喜欢

热点阅读