Spring Cloud Gateway 优雅修改请求与响应报文

2021-08-19  本文已影响0人  yfgeek

修改请求报文、响应报文是API网关框架的基础功能,然而在Spring Cloud Gateway中修改报文体似乎并不是一件容易的事,本文以3.0.3版本为例,讲讲在Spring Cloud Gateway如何优雅的修改请求报文、响应报文。

一、官方方法

在Spring Cloud Gateway官方文档中,有如下方法,可供参考:

1.1 修改请求报文

@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org")
            .filters(f -> f.prefixPath("/httpbin")
                .modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE,
                    (exchange, s) -> return Mono.just(new Hello(s.toUpperCase())))).uri(uri))
        .build();
}

static class Hello {
    String message;

    public Hello() { }

    public Hello(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

1.2 修改响应报文

@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("rewrite_response_upper", r -> r.host("*.rewriteresponseupper.org")
            .filters(f -> f.prefixPath("/httpbin")
                .modifyResponseBody(String.class, String.class,
                    (exchange, s) -> Mono.just(s.toUpperCase()))).uri(uri))
        .build();
}

当然,这种方式有其局限性:

二、优雅实现

一开始,当我接触Spring Cloud Gateway时,想自己通过实现Global Filter实现修改请求报文、响应报文,摸不着头脑。一个看似很简单的问题,在zuul1中只需要修改两下变量,就可以轻松改掉。换了异步非阻塞的Spring Cloud Gateway,仿若掉入了天坑,想修改一次,没有100行代码,办不了这个事情。

看互联网上有很多文章,代码不仅冗余、复杂、不够优雅、易读性差,还不能够支持HTTP 1.1、Gzip,总给人一种hacky实现的感觉。这就让我顿时疑惑了起来,一个堂堂的Gateway网关,修改请求报文、响应报文居然要这么麻烦。

后来,随着阅读官方文档、官方源码的不断深入,我理解了其实Spring Cloud Gateway的初衷,似乎并不是想做一个网关“框架”,而更像是做一个开箱即用的网关应用程序,任何网关相关的参数,均可通过参数配置实现,无需自行编码,或者使用轻量级的函数式编程语句。确实,这很好,对于微服务网关,足够了。但是,如果要深度定制网关的功能,就会感到十分为难,一个封装十足彻底的工具,要想不动引用包源码的情况下,从外层修改它,犹如把一个豪华法拉利改装成特斯拉,使用网上的hacky办法,总给人一种,里外里套了两层的感觉。

2.1 实现原理

为了解决不够优雅的问题,通过借鉴Spring Cloud Gateway 如下类的 原生的rewrite方法,重新实现Config的响应式参数传递,从而实现在Filter中修改请求报文、响应报文的函数式编程,一劳永逸。

org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory

org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory

通过该方式实现修改body体,相较于网络上的通用方法,好处如下:

值得注意的是,需要对Mono或Flux的异常进行捕获,捕获方式不一定是try catch的方式,而是.just(xxx).doOnError()

2.2 基础封装

在工程中,创建3个类,放到基础目录下,用于调用,如果Spring Cloud Gateway更新了请求、响应相关的代码,只需更新如下代码即可。

RewriteConfig.java

import org.springframework.cloud.gateway.filter.factory.rewrite.RewriteFunction;

import java.util.Map;

public class RewriteConfig {

    private Class inClass;

    private Class outClass;

    private Map<String, Object> inHints;

    private Map<String, Object> outHints;

    private String newContentType;

    private String contentType;

    private RewriteFunction rewriteFunction;

    public Class getInClass() {
        return inClass;
    }

    public RewriteConfig setInClass(Class inClass) {
        this.inClass = inClass;
        return this;
    }

    public Class getOutClass() {
        return outClass;
    }

    public RewriteConfig setOutClass(Class outClass) {
        this.outClass = outClass;
        return this;
    }

    public Map<String, Object> getInHints() {
        return inHints;
    }

    public RewriteConfig setInHints(Map<String, Object> inHints) {
        this.inHints = inHints;
        return this;
    }

    public Map<String, Object> getOutHints() {
        return outHints;
    }

    public RewriteConfig setOutHints(Map<String, Object> outHints) {
        this.outHints = outHints;
        return this;
    }

    public String getNewContentType() {
        return newContentType;
    }

    public RewriteConfig setNewContentType(String newContentType) {
        this.newContentType = newContentType;
        return this;
    }

    public RewriteFunction getRewriteFunction() {
        return rewriteFunction;
    }

    public RewriteConfig setRewriteFunction(RewriteFunction rewriteFunction) {
        this.rewriteFunction = rewriteFunction;
        return this;
    }

    public <T, R> RewriteConfig setRewriteFunction(Class<T> inClass, Class<R> outClass,
                                            RewriteFunction<T, R> rewriteFunction) {
        setInClass(inClass);
        setOutClass(outClass);
        setRewriteFunction(rewriteFunction);
        return this;
    }

    public String getContentType() {
        return "application/json;charset=utf-8";
    }

    public RewriteConfig setContentType(String contentType) {
        this.contentType = contentType;
        return this;
    }
}

ModifiedRequestDecorator.java

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.function.Function;

public class ModifiedRequestDecorator {

    private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
    private final RewriteConfig config;

    public ModifiedRequestDecorator(ServerWebExchange exchange, RewriteConfig config) {
        this.config = config;
    }

    @SuppressWarnings("unchecked")
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Class inClass = config.getInClass();
        ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);

        // TODO: flux or mono
        Mono<?> modifiedBody = serverRequest.bodyToMono(inClass)
                .flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
                .switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));

        BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, config.getOutClass());
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());

        // the new content type will be computed by bodyInserter
        // and then set in the request decorator
        headers.remove(HttpHeaders.CONTENT_LENGTH);

        // if the body is changing content types, set it here, to the bodyInserter
        // will know about it
        if (config.getContentType() != null) {
            headers.set(HttpHeaders.CONTENT_TYPE, config.getContentType());
        }
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
                // .log("modify_request", Level.INFO)
                .then(Mono.defer(() -> {
                    ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
                    return chain.filter(exchange.mutate().request(decorator).build());
                })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
                        outputMessage, throwable));

    }


    protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
                                 Throwable throwable) {
        return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
    }

    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                }
                else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }
}

ModifiedResponseDecorator.java

import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.filter.factory.rewrite.GzipMessageBodyResolver;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyDecoder;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyEncoder;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.function.Function.identity;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
public class ModifiedResponseDecorator extends ServerHttpResponseDecorator {

    private final ServerWebExchange exchange;

    private final RewriteConfig config;

    private final Map<String, MessageBodyDecoder> messageBodyDecoders ;
    private final Map<String, MessageBodyEncoder> messageBodyEncoders;

    private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();


    public ModifiedResponseDecorator(ServerWebExchange exchange, RewriteConfig config) {
        super(exchange.getResponse());
        this.exchange = exchange;
        this.config = config;
        Set<MessageBodyDecoder> messageBodyDecodersSet = new HashSet<>();
        Set<MessageBodyEncoder> messageBodyEncodersSet = new HashSet<>();
        MessageBodyDecoder messageBodyDecoder = new GzipMessageBodyResolver();
        MessageBodyEncoder messageBodyEncoder = new GzipMessageBodyResolver();
        messageBodyDecodersSet.add(messageBodyDecoder);
        messageBodyEncodersSet.add(messageBodyEncoder);
        this.messageBodyDecoders = messageBodyDecodersSet.stream()
                .collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity()));
        this.messageBodyEncoders = messageBodyEncodersSet.stream()
                .collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity()));
    }


    @SuppressWarnings("unchecked")
    @Override
    public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

        Class inClass = config.getInClass();
        Class outClass = config.getOutClass();

        String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
        HttpHeaders httpHeaders = new HttpHeaders();
        // explicitly add it in this way instead of
        // 'httpHeaders.setContentType(originalResponseContentType)'
        // this will prevent exception in case of using non-standard media
        // types like "Content-Type: image"
        httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);

        ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);

        // TODO: flux or mono
        Mono modifiedBody = extractBody(exchange, clientResponse, inClass)
                .flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
                .switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));

        BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
                exchange.getResponse().getHeaders());
        return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
            Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
            HttpHeaders headers = getDelegate().getHeaders();
            if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
                    || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
            }
            // TODO: fail if isStreamingMediaType?
            return getDelegate().writeWith(messageBody);
        }));
    }

    @Override
    public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
        return writeWith(Flux.from(body).flatMapSequential(p -> p));
    }

    private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
        ClientResponse.Builder builder;
        builder = ClientResponse.create(exchange.getResponse().getStatusCode(), messageReaders);
        return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
    }

    private <T> Mono<T> extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class<T> inClass) {
        // if inClass is byte[] then just return body, otherwise check if
        // decoding required
        if (byte[].class.isAssignableFrom(inClass)) {
            return clientResponse.bodyToMono(inClass);
        }

        List<String> encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
        for (String encoding : encodingHeaders) {
            MessageBodyDecoder decoder = messageBodyDecoders.get(encoding);
            if (decoder != null) {
                return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode)
                        .map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes))
                        .map(buffer -> prepareClientResponse(Mono.just(buffer),
                                exchange.getResponse().getHeaders()))
                        .flatMap(response -> response.bodyToMono(inClass));
            }
        }

        return clientResponse.bodyToMono(inClass);
    }

    private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message,
                                       Class<?> outClass) {
        Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
        if (byte[].class.isAssignableFrom(outClass)) {
            return response;
        }

        List<String> encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
        for (String encoding : encodingHeaders) {
            MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
            if (encoder != null) {
                DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
                response = response.publishOn(Schedulers.parallel()).map(buffer -> {
                    byte[] encodedResponse = encoder.encode(buffer);
                    DataBufferUtils.release(buffer);
                    return encodedResponse;
                }).map(dataBufferFactory::wrap);
                break;
            }
        }
        return response;
    }

}

修改请求

filter()方法返回参考代码

            // 修改请求内容
            return new ModifiedRequestDecorator(exchange, new RewriteConfig()
                    .setRewriteFunction(String.class, String.class, (ex, requestData)
                    ->  Mono.just(要修改请求内容的方法(requestData))
            )).filter(exchange, chain);

修改响应

filter()方法返回参考代码

    // 修改响应内容
    return chain.filter(exchange.mutate().response(
        new ModifiedResponseDecorator(exchange, new RewriteConfig().
        setRewriteFunction(String.class, String.class, (ex, responseData)
        ->  Mono.just(要修改响应内容的方法(responseData))
        ))).build());

修改请求、响应

filter()方法返回参考代码

            // 修改请求内容
            return new ModifiedRequestDecorator(exchange, new RewriteConfig()
                    .setRewriteFunction(String.class, String.class, (ex, requestData)
                    ->  Mono.just(要修改请求内容的方法(requestData))
            )).filter(exchange.mutate().response(
            // 修改响应内容
                    new ModifiedResponseDecorator(exchange, new RewriteConfig().
                            setRewriteFunction(String.class, String.class, (ex, responseData)
                    ->  Mono.just(要修改响应内容的方法(responseData))
                ))).build(),chain);
上一篇下一篇

猜你喜欢

热点阅读