Spring Cloud Gateway 默认的filter功能
2018-07-28 本文已影响6509人
giafei
有效性
Spring Cloud Gateway 2.0.0.RELEASE
调试方法
新建一个GlobalFilter,在filter中加断点即可调试filter,通过chain参数可以查看其它的filter及执行顺序(order)
filters(按执行顺序)
1. AdaptCachedBodyGlobalFilter
核心代码
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1000;
}
public static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Flux<DataBuffer> body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_KEY, null);
if (body != null) {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return body;
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}
return chain.filter(exchange);
}
提供替换request 的 body的能力
2.NettyWriteResponseFilter
核心代码
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
//见 后文的 NettyRoutingFilter
HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
final Flux<NettyDataBuffer> body = clientResponse.receive()
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
return (isStreamingMediaType(contentType) ?
response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
}));
}
具体的将被代理的服务的内容返回的类,文档
3.ForwardPathFilter
核心代码
public int getOrder() {
return 0;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
URI routeUri = route.getUri();
String scheme = routeUri.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
exchange = exchange.mutate().request(
exchange.getRequest().mutate().path(routeUri.getPath()).build())
.build();
return chain.filter(exchange);
}
forward协议的url替换类
4.在Route中配置的各种GatewayFilter
核心代码
/**
* RouteDefinitionRouteLocator#loadGatewayFilters GatewayFilter的order
*/
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
根据配置不同实现具体的功能,详见文档
5.RouteToRequestUrlFilter
核心代码
public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;
public int getOrder() {
return ROUTE_TO_URL_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
//匹配 http:http://locahost:80/a/b/c?q=1,并把第一个 http: 去掉
if (hasAnotherScheme(routeUri)) {
// uri格式 [scheme:]scheme-specific-part[#fragment]
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
URI requestUrl = UriComponentsBuilder.fromUri(uri)
.uri(routeUri)
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";
static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);
static boolean hasAnotherScheme(URI uri) {
return schemePattern.matcher(uri.getSchemeSpecificPart()).matches() && uri.getHost() == null
&& uri.getRawPath() == null;
}
路由功能的具体执行类,文档
6.LoadBalancerClientFilter(如果启用了eureka)
核心代码
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
//一大波转换操作
addOriginalRequestUrl(exchange, url);
final ServiceInstance instance = loadBalancer.choose(url.getHost());
if (instance == null) {
throw new NotFoundException("Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
//转换后的url填入 GATEWAY_REQUEST_URL_ATTR 属性
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
lb协议的路由功能,文档
7.WebsocketRoutingFilter
核心代码
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//upgrade头 见https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism
//或见 https://httpwg.org/specs/rfc7230.html#header.upgrade
changeSchemeIfIsWebSocketUpgrade(exchange);
//跳过一大波参数检查与参数获取
return this.webSocketService.handleRequest(exchange,
new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
filtered, protocols));
}
/**
* ProxyWebSocketHandler#handle 桥接两个webSocket
*/
public Mono<Void> handle(WebSocketSession session) { //session为客户端
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) { //proxySession为被代理的WebSocket
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
return Mono.zip(proxySessionSend, serverSessionSend).then();
}
//省略其它方法
});
}
WebSocket的代理功能,文档
8.NettyRoutingFilter
核心代码
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
//省略一大波参数获取和参数校验
final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
final String url = requestUrl.toString();
return this.httpClient.request(method, url, req -> {
//省略http数据发送代码
}).doOnNext(res -> {
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
//注意,如果ContentType为null会 NPE,特别是301或302跳转
exchange.getAttributes().put("original_response_content_type", headers.getContentType());
//省略其它http解析代码
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); //与前面的 NettyWriteResponseFilter 对应
}).then(chain.filter(exchange));
}
}
http协议的代理功能,文档
9.ForwardRoutingFilter
核心代码
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: "+requestUrl);
}
return this.dispatcherHandler.handle(exchange);
}
将未处理的forward协议的请求交由spring来处理,文档
其中 NettyRoutingFilter
和 NettyWriteResponseFilter
内置有 WebClientHttpRoutingFilter
和WebClientWriteResponseFilter
作为备用替换版本。