spring cloud gateway架构

spring cloud gateway 的执行流程

2021-10-27  本文已影响0人  virtual灬zzZ

spring cloud gateway 流程:

spring cloud gateway 官网的流程图

具体执行流程:

  1. DispatcherHandler :接收到请求后匹配 HandlerMapping ,此处会匹配到 RoutePredicateHandlerMapping ;
  2. RoutePredicateHandlerMapping :匹配 Route ;
  3. FilteringWebHandler :获取 Route 的 GatewayFilter 列表,创建 GatewayFilterChain 来处理请求。

DispatcherHandler

请求分发处理器,是WebFlux的访问入口。看这似曾相识的样子,没错,对应到Spring MVC中,跟它承担类似作用的,就是DispatcherServlet。DispatcherHander也和DispatcherServlet的请求处理流程类似:


public class DispatcherHandler implements WebHandler, ApplicationContextAware {
    private List<HandlerMapping> handlerMappings;
    private List<HandlerAdapter> handlerAdapters;
    private List<HandlerResultHandler> resultHandlers;

        // 1. 初始化时从BeanFactory分别拉取类型为:HandlerMapping、HandlerAdapter、HandlerResultHandler
        //    的所有Bean,填充到:handlerMappings、handlerAdapters、resultHandlers
    protected void initStrategies(ApplicationContext context) {
        Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerMapping.class, true, false);

        ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
                // 对所有的HandlerMapping排序,实现按优先级选择HandlerMapping使用的关键
                // 因为HandlerMapping的作用是判断一个请求究竟适不适用于它所有对应的Handler
                // 最终是通过Handler去处理一个请求的,所以假设我们有一组HandlerMapping
                // 这时来了个请求,我们就先把HandlerMapping按getOrder()拿到的优先级排序
                // 然后依次做HandlerMapping.getHandler(request);
                // 如果这个过程是顺序的,或者说同步的,那么第一次有某个HandlerMapping
                // 拿到了Handler的话,说明它就是我们需要的Mr Right,方法可以直接返回这个了。
                // 其实在Spring MVC里 DispatcherServlet就是通过 for if != null return
                // 这样去顺序取HandlerMapping尝试获取Handler的。
                // 但是在Spring WebFlux里,Reactor编程将这个过程异步化了:
                // 通过concatMap()将所有的HandlerMapping异步getHandler(request);
                // 它不保证每个HandlerMapping执行getHandler的顺序,但保证最终得到的
                // 一组结果,是按原先HandlerMapping的顺序排好的,所以只用next()来获取
                // 第一个Handler,就得到了符合HandlerMapping优先级的Handler,得到了
                // 与同步执行同样的结果。假设HandlerMapping比较多,或者每个HandlerMapping
                // 执行getHandler()的时间比较久,那么异步是有优势的,它相当于是把这个过程
                // 并行化了。但也并非一定总有优势,要并行就需要分配多个线程去做任务,也涉及到
                // 最终结果的归并。多个线程会占用额外的cpu资源,与其他任务竞争时间片,CPU调度
                // 线程造成上下文切换消耗也增加了。并且每个getHandler的线程计算结果是需要维护
                // 对应的优先级的,因为最终合并结果需要保持不改变顺序。
        AnnotationAwareOrderComparator.sort(mappings);
        this.handlerMappings = Collections.unmodifiableList(mappings);

        Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerAdapter.class, true, false);

        this.handlerAdapters = new ArrayList<>(adapterBeans.values());
        AnnotationAwareOrderComparator.sort(this.handlerAdapters);

        Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerResultHandler.class, true, false);

        this.resultHandlers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(this.resultHandlers);
    }

        // 2.遍历handlerMappings,获取对应的WebHandler
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return Flux.fromIterable(this.handlerMappings)
                                // a.此处会匹配到RoutePredicateHandlerMapping
                                // b.RoutePredicateHandlerMapping匹配请求对应的 Route
                                // c.返回FilteringWebHandler
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                                // d.执行Handler
                .flatMap(handler -> invokeHandler(exchange, handler))
                                // e.处理result
                .flatMap(result -> handleResult(exchange, result));
    }

        // 3.执行Handler,得到HandlerResult
    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (this.handlerAdapters != null) {
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                if (handlerAdapter.supports(handler)) {
                    return handlerAdapter.handle(exchange, handler);
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }

        // 5.处理返回结果
    private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
        return getResultHandler(result).handleResult(exchange, result)
                .checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]")
                .onErrorResume(ex ->
                        result.applyExceptionHandler(ex).flatMap(exResult -> {
                            String text = "Exception handler " + exResult.getHandler() +
                                    ", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";
                            return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);
                        }));
    }

        // 4.根据Result类型,获取处理返回结果的Handler
    private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
        if (this.resultHandlers != null) {
            for (HandlerResultHandler resultHandler : this.resultHandlers) {
                if (resultHandler.supports(handlerResult)) {
                    return resultHandler;
                }
            }
        }
        throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
    }
}
上面的dispatcherHandler的流程简单点说:先初始换那些handlerMapping、handlerAdater、resultAdater,从中遍历了handlerMapping,最终它找到RoutePredicateHandlerMapping,就是开始第二步骤。

RoutePredicateHandlerMapping

执行他的核心方法:

@Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on management port if set and different than server port
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                                "Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for ["
                                + getExchangeDesc(exchange) + "]");
                    }
                })));
    }


protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes()
                // individually filter routes so that filterWhen error delaying is not a
                // problem
                .concatMap(route -> Mono.just(route).filterWhen(r -> {
                    // add the current route we are testing
                    exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    return r.getPredicate().apply(exchange);
                })
                        // instead of immediately stopping main flux due to error, log and
                        // swallow it
                        .doOnError(e -> logger.error(
                                "Error applying predicate for route: " + route.getId(),
                                e))
                        .onErrorResume(e -> Mono.empty()))
                // .defaultIfEmpty() put a static Route not found
                // or .switchIfEmpty()
                // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
                .next()
                // TODO: error handling
                .map(route -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Route matched: " + route.getId());
                    }
                    validateRoute(route, exchange);
                    return route;
                });

        /*
         * TODO: trace logging if (logger.isTraceEnabled()) {
         * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
         */
    }
看lookupRoute方法就知道去找相应的Route了,是根据routeId去找的。找到相应的route,再返回FilteringWebHandler。

FilteringWebHandler 过滤web请求处理

她的本职工作是将GlobalFilter(全局过滤器)、GatewayFilter(一般来说是我们自己配置的,当然也默认内置了一些,也内含了自己在application配置文件中配的defaultFilter[如果有陪置的话]),放到同一个List里进行优先级排序,生成一个过滤器链。执行过滤器链,就能顺序执行链中保存的所有Filter。

public class FilteringWebHandler implements WebHandler {
    // 所有全局过滤器
    private final List<GatewayFilter> globalFilters;

    public FilteringWebHandler(List<GlobalFilter> globalFilters) {
        // 1.初始化时加载所有全局过滤器,将他们适配为GatewayFilter类型,方便等会做合并
        this.globalFilters = loadFilters(globalFilters);
    }
    // 适配器模式,通过两次套娃把GlobalFilter封装成GatewayFilter类型
    private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
    return filters.stream().map(filter -> {
            // 2.Adapter持有GlobalFilter的引用
        GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
        if (filter instanceof Ordered) {
        int order = ((Ordered) filter).getOrder();
                // 3.OrderedGatewayFilter再持有一个Adapter
                // 其实这里搞的挺无语的,那我直接返回Adapter又不是不能用,毕竟接口相同
                // 说白了,还是跟职责隔离、设计解耦有关
                // 就好比有时我们会纠结干嘛不直接把路由配置读成Route是类似的
                // 因为我们的思维模式,就是快点达到目的,而忽略了扩展性
                // 假设现在我们需要有一个服务提供多个接口,包括了Filter接口的功能
                // 那么,我们还是直接在Adapter上加接口,再持有一个跟它功能无关的工具人干活?
                // 那职责就完全乱套了,我们大可以这样:
                // Kunkun implements ISing,IJump,IRap,IBasketball {
                //     private ISing singAdapter;
                //     private IJump jumpAdapter;
                //     private IRap rapAdapter;
                //     private IBasketball basketballAdapter;
                // }
                // 那么坤坤就会唱、跳、rap、篮球了
        return new OrderedGatewayFilter(gatewayFilter, order);
        }
        return gatewayFilter;
        }).collect(Collectors.toList());
}

public Mono<Void> handle(ServerWebExchange exchange) {
    // 1.从exchange里拿到第二步匹配好的Route(匹配到后塞进去的)
    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
    // 2.通过这个Route拿到它对应的GatewayFilter列表
    List<GatewayFilter> gatewayFilters = route.getFilters();
    // 3.把前面已经适配为GatewayFilter的全局过滤器拿过来,初始化一个总的过滤器列表
    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
    // 4.然后把从Route里拿到的过滤器列表也塞进去合并
    combined.addAll(gatewayFilters);
    // 5.对总的过滤器列表按优先级排序
    AnnotationAwareOrderComparator.sort(combined);
    // 6.通过这个总的过滤器列表构造一个过滤器链来执行过滤逻辑
    return new DefaultGatewayFilterChain(combined).filter(exchange);
}
所以总结就是将globalFilter和route的filters结合在一块,形成一个过滤器链,之后请求就去逐个执行过滤器链的过滤器。

参考文章:

spring cloud gateway 专题收录1

spring cloud gateway 专题收录2

上一篇下一篇

猜你喜欢

热点阅读