Zuul源码分析(二) BaseFilter

2018-11-12  本文已影响0人  skyguard

下面我们来分析Zuul的filter处理。我们先来介绍一个类BaseFilter。这个类实现了ZuulFilter接口。这个类实现了对处理filter的并发控制。先来看下具体的实现

public void incrementConcurrency() throws ZuulFilterConcurrencyExceededException {
    final int limit = filterConcurrencyLimit.get();
    if ((concurrencyProtectEnabled.get()) && (concurrentCount.get() >= limit)) {
        concurrencyRejections.increment();
        throw new ZuulFilterConcurrencyExceededException(this, limit);
    }
    concurrentCount.incrementAndGet();
}

就是当请求数量大于limit时,限制对filter的处理请求。
BaseZuulFilterRunner执行过滤请求的逻辑,这个类实现了FilterRunner接口。看下具体的实现

private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
    T inMesg = mesg;
    String filterName = "-";
    try {
        Preconditions.checkNotNull(mesg, "Input message");
        int i = runningFilterIdx.get();

        while (i < filters.length) {
            final ZuulFilter<T, T> filter = filters[i];
            filterName = filter.filterName();
            final T outMesg = filter(filter, inMesg);
            if (outMesg == null) {
                return; //either async filter or waiting for the message body to be buffered
            }
            inMesg = outMesg;
            i = runningFilterIdx.incrementAndGet();
        }

        //Filter chain has reached its end, pass result to the next stage
        invokeNextStage(inMesg);
    }
    catch (Exception ex) {
        handleException(inMesg, filterName, ex);
    }
}

看下filter方法

 final long startTime = System.currentTimeMillis();
    final ZuulMessage snapshot = inMesg.getContext().debugRouting() ? inMesg.clone() : null;
    FilterChainResumer resumer = null;

    try {
        ExecutionStatus filterRunStatus = null;
        if (filter.filterType() == INBOUND && inMesg.getContext().shouldSendErrorResponse()) {
            // Pass request down the pipeline, all the way to error endpoint if error response needs to be generated
            filterRunStatus = SKIPPED;
        }

        if (shouldSkipFilter(inMesg, filter)) {
            filterRunStatus = SKIPPED;
        }

        if (filter.isDisabled()) {
            filterRunStatus = DISABLED;
        }

        if (filterRunStatus != null) {
            recordFilterCompletion(filterRunStatus, filter, startTime, inMesg, snapshot);
            return filter.getDefaultOutput(inMesg);
        }

        if (!isMessageBodyReadyForFilter(filter, inMesg)) {
            setFilterAwaitingBody(inMesg, true);
            LOG.debug("Filter {} waiting for body, UUID {}", filter.filterName(), inMesg.getContext().getUUID());
            return null;  //wait for whole body to be buffered
        }
        setFilterAwaitingBody(inMesg, false);

        if (snapshot != null) {
            Debug.addRoutingDebug(inMesg.getContext(), "Filter " + filter.filterType().toString() + " " + filter.filterOrder() + " " + filter.filterName());
        }

        //run body contents accumulated so far through this filter
        inMesg.runBufferedBodyContentThroughFilter(filter);

        if (filter.getSyncType() == FilterSyncType.SYNC) {
            final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
            final O outMesg = syncFilter.apply(inMesg);
            recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
            return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
        }

        // async filter
        filter.incrementConcurrency();
        resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
        filter.applyAsync(inMesg)
            .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
            .doOnUnsubscribe(resumer::decrementConcurrency)
            .subscribe(resumer);

        return null;  //wait for the async filter to finish
    }
    catch (Throwable t) {
        if (resumer != null) {
            resumer.decrementConcurrency();
        }
        final O outMesg = handleFilterException(inMesg, filter, t);
        outMesg.finishBufferedBodyIfIncomplete();
        recordFilterCompletion(FAILED, filter, startTime, inMesg, snapshot);
        return outMesg;
    }

Zuul的filter处理业务逻辑还是比较简单的,基本就是通过BaseZuulFilterRunner调用Filter完成的。
BaseFilter的分析就到这里了。

上一篇 下一篇

猜你喜欢

热点阅读