Spring Cloud Gateway 获取Request和R

2021-10-12  本文已影响0人  幻一夜

gateway.version: 2.1.5.RELEASE

获取Request Body:

自定义RequestDecorator


@Component
public class CacheBodyGlobalFilterimplements Ordered, GlobalFilter {
        
      private static final LoggerLOGGER = LoggerFactory.getLogger(CacheBodyGlobalFilter.class);

      @Autowired
      private IdmInvocationSecurityMetadataSourcemetadataSource;

      @Override
      public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) {

            if (exchange.getRequest().getHeaders().getContentType() ==null) {
                return chain.filter(exchange);
            } else {
                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
                        if (dataBuffer.readableByteCount() >0) {
                              exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, dataBuffer);
     }
          // 自定义Decorator获取body
             ServerHttpRequest decorator =new ServerHttpRequestDecorator(exchange.getRequest()) {
                   @Override
                   public FluxgetBody() {
                          return Mono.fromSupplier(() -> {
                        // gateway 向服务转发请求时还会调用一次getBody第二次直接返回dataBuffer
                    if (exchange.getAttributeOrDefault(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR,null) ==null) {

                return dataBuffer;

}

                  // TODO: deal with Netty                    
                    NettyDataBuffer pdb = (NettyDataBuffer)dataBuffer;
                            return pdb.factory().wrap(pdb.getNativeBuffer().retainedSlice());
                  }).flux();
              }
};

            return chain.filter(exchange.mutate().request(decorator).build());
        });
      }
}

@Override
  public int getOrder() {
        return -4;
  }

}

在其他Filter中获取body

Flux dataBufferFlux = exchange.getRequest().getBody();

AtomicReference bodyRef =new AtomicReference<>();

// 缓存读取的request body信息

dataBufferFlux .subscribe(dataBuffer -> {

CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());

  DataBufferUtils.release(dataBuffer);

  bodyRef.set(charBuffer.toString());

});

String body = bodyRef.get();

exchange.getAttributes().remove(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR);

获取Response Body:


@Component

public class GatewayResponseFilterimplements GlobalFilter, Ordered {

private static final LoggerLOGGER = LoggerFactory.getLogger(GatewayResponseFilter.class);

        @Override
        public int getOrder() {
            return -2;
        }

@Override
  public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) {
      //获取response的 返回数据
      ServerHttpResponse originalResponse = exchange.getResponse();

      DataBufferFactory bufferFactory = originalResponse.bufferFactory();

      ServerHttpResponseDecorator response =new ServerHttpResponseDecorator(originalResponse) {

@Override

        public MonowriteWith(Publisher body) {

if (getStatusCode().equals(HttpStatus.OK) && bodyinstanceof Flux) {

// 获取ContentType,判断是否返回JSON格式数据

              String originalResponseContentType =exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);

              if (org.apache.commons.lang.StringUtils.isNotBlank(originalResponseContentType) && originalResponseContentType.contains("application/json")) {

                  Flux fluxBody = Flux.from(body);
                  return super.writeWith(fluxBody.buffer().map(dataBuffers -> {//解决返回体分段传输
                    DataBufferFactory dataBufferFactory =new DefaultDataBufferFactory();
                    String responseData =null;
                    try {
                        DataBuffer dataBuffer = dataBufferFactory.join(dataBuffers);
                        byte[] content =new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(content);
                        DataBufferUtils.release(dataBuffer);
                        responseData =new String(content, "UTF-8");
                    }catch (UnsupportedEncodingException e) {
                        LOGGER.info("获取response body,失败原因:{}", Throwables.getStackTraceAsString(e));
                    }

/                          / 二次处理(加密/过滤等)如果不需要做二次处理可直接跳过下行

                            // body转码

                    byte[] uppedContent =new String(responseData.getBytes(), Charset.forName("UTF-8")).getBytes();

                    originalResponse.getHeaders().setContentLength(uppedContent.length);

                    return bufferFactory.wrap(uppedContent);

                  }));

              }

}

          return super.writeWith(body);
 }

@Override

        public MonowriteAndFlushWith(Publisher> body) {

return writeWith(Flux.from(body).flatMapSequential(p -> p));

        }

};

      return chain.filter(exchange.mutate().response(response).build());

  }

上一篇 下一篇

猜你喜欢

热点阅读