Java

SpringMVC 篇(四)异步请求

2021-02-03  本文已影响0人  Tubetrue01

引言

本篇文章简单介绍一下 SpringMVC 中的异步请求。由于 SpringMVC 其实就是 Servlet 的封装,那么异步也是如此,底层调用的还是 Servlet 中的异步,只不过 Spring 实现起来更简单一些。

起源

在 Servlet 3.0 之前,Servlet 采用 Thread-Per-Request 的方式处理请求,即每一次 Http 请求都由某一个线程从头到尾负责处理。如果一个请求需要进行 IO 操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待 IO 操作完成, 而 IO 操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。为了解决这样的问题,Servlet 3.0 引入了异步处理,然后在 Servlet 3.1 中又引入了非阻塞 IO 来进一步增强异步处理的性能。

SpringMVC 的异步

把同步方法变成异步方法很简单,只是将 Controller 的返回值用 DeferredResult 封装就行,比如:
示例:

   @GetMapping("/async")
    public DeferredResult<String> say() {
        DeferredResult<String> result = new DeferredResult<>();

        AsyncUtils.async(() -> {
            result.setResult("Hello World");
        }, result);

        return result;
    }

是不是很简单?那么 SpringMVC 是如何实现异步的相关操作呢?(AsyncUtils 中的代码由另外的线程负责处理)有了前几篇文章的铺垫,我想你肯定能够猜到,其实还是由特定的返回值处理器来完成的。

DeferredResultMethodReturnValueHandler

通过它的名字,我们可以看出它其实是一个返回值处理器,也就是 HandlerMethodReturnValueHandler 的子类,我们看一下它的核心代码。

    // 从这里我们可以看出,当返回值是 DeferredResult、ListenableFuture、CompletionStage 三者之一时,该处理器就会生效
    @Override
    public boolean supportsReturnType(MethodParameter returnType) {
        Class<?> type = returnType.getParameterType();
        return (DeferredResult.class.isAssignableFrom(type) ||
                ListenableFuture.class.isAssignableFrom(type) ||
                CompletionStage.class.isAssignableFrom(type));
    }

    // 处理返回值的具体逻辑 
    @Override
    public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
            ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

        if (returnValue == null) {
            mavContainer.setRequestHandled(true);
            return;
        }

        DeferredResult<?> result;
        // 我们用的最多的就是 DeferredResult,所以这里只介绍 DeferredResult 的相关处理
        if (returnValue instanceof DeferredResult) {
            result = (DeferredResult<?>) returnValue;
        }
        else if (returnValue instanceof ListenableFuture) {
            result = adaptListenableFuture((ListenableFuture<?>) returnValue);
        }
        else if (returnValue instanceof CompletionStage) {
            result = adaptCompletionStage((CompletionStage<?>) returnValue);
        }
        else {
            throw new IllegalStateException("Unexpected return value type: " + returnValue);
        }
        // 通过异步工具类从当前请求中获取先前创建的异步管理器,然后开始异步处理工作
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
    }

其实这个处理器很简单,我们在最后发现,其实它是把相关的处理交给了 WebAsyncManager 中的 startDeferredResultProcessing() 方法。至于 WebAsyncUtils.getAsyncManager() 方法,前几篇文章中也有所介绍,就是请求进来的时候,会为当前请求创建出对应的异步管理器,然后跟 request 绑定(其实就是放到了 request 的 attributes 表中)而后续的操作都是通过这个异步管理器来判断是否是异步请求以及异步的相关操作。所以我们重点看一下这个异步管理器。

WebAsyncManager

有一点是我们需要注意的,就是每个请求对象都会绑定自己的异步管理器 WebAsyncManager,那么就避免了请求之间的并发问题。

  // 空结果对象
  private static final Object RESULT_NONE = new Object();
  // 执行异步任务的线程池
  private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR =
            new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());

  // 用于 Callable 异步任务的超时拦截器
  private static final CallableProcessingInterceptor timeoutCallableInterceptor =
            new TimeoutCallableProcessingInterceptor();
  // 用于 DeferredResult 的超时拦截器
  private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
            new TimeoutDeferredResultProcessingInterceptor();
  // 异步请求对象(StandardServletAsyncWebRequest)
  private AsyncWebRequest asyncWebRequest;

  // 一些额外对象,比如:视图解析器
  private volatile Object[] concurrentResultContext;

  // 异步任务处理的方法
  public void startDeferredResultProcessing(
            final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
        // deferredResult 就是 controller 返回的 DeferredResult 对象(相比于同步方法,异步请求的方法需要由 DeferredResult 进行封装)
        Assert.notNull(deferredResult, "DeferredResult must not be null");
        // 这里的 asyncWebRequest 就是对 Request 的封装
        Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
       // 异步请求的超时时间,用户可以自定义
        Long timeout = deferredResult.getTimeoutValue();
        if (timeout != null) {
            this.asyncWebRequest.setTimeout(timeout);
        }
        // 注意这里是线程独享的拦截器,每个请求之间互相独立
        List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
        // deferredResult 自定义的拦截器 
        interceptors.add(deferredResult.getInterceptor());
        // 系统定义的拦截器
        interceptors.addAll(this.deferredResultInterceptors.values());
        // 超时处理拦截器
        interceptors.add(timeoutDeferredResultInterceptor);
        // 封装成拦截器链 
        final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
        // 将超时拦截器注册到请求对象上 
        this.asyncWebRequest.addTimeoutHandler(() -> {
            try {
                interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
            }
            catch (Throwable ex) {
                setConcurrentResultAndDispatch(ex);
            }
        });
        // 为请求对象注册错误处理器
        this.asyncWebRequest.addErrorHandler(ex -> {
            if (!this.errorHandlingInProgress) {
                try {
                    if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
                        return;
                    }
                    deferredResult.setErrorResult(ex);
                }
                catch (Throwable interceptorEx) {
                    setConcurrentResultAndDispatch(interceptorEx);
                }
            }
        });
        // 为请求对象添注册异步任务执行完成的处理器
        this.asyncWebRequest.addCompletionHandler(()
                -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));
        // 并发处理之前的回调
        interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
        // 初始化上下文对象,为后续的响应做准备
        startAsyncProcessing(processingContext);

        try {
            // 结果处理之前的回调 
            interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
            // 设置结果处理器给 DeferredResult,这里会设置回调函数,也就是说 Tomcat 线程到这里就已经释放了
            deferredResult.setResultHandler(result -> {
                // 当结果异步处理之后会进入该方法
                result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
                // 结果响应,该方法会调用 StandardServletAsyncWebRequest 中的 dispatch() 方法进行处理 
                setConcurrentResultAndDispatch(result);
            });
        }
        catch (Throwable ex) {
            setConcurrentResultAndDispatch(ex);
        }
  }

startAsyncProcessing(processingContext)

    // 开始异步相关处理工作
    private void startAsyncProcessing(Object[] processingContext) {
        // 注意这里的加锁操作,锁对象是 WebAsyncManager,虽然每个 Request 对应一个 WebAsyncManager 对象,但是同一个 Request 会有多个线程访问 WebAsyncManager 对象
        synchronized (WebAsyncManager.this) {
            this.concurrentResult = RESULT_NONE;
            this.concurrentResultContext = processingContext;
            this.errorHandlingInProgress = false;
        }
        // 标记异步处理的开始,这里的 asyncWebRequest 其实是 StandardServletAsyncWebRequest
        this.asyncWebRequest.startAsync();

        if (logger.isDebugEnabled()) {
            logger.debug("Started async request");
        }
    }

StandardServletAsyncWebRequest

从 startAsyncProcessing 方法中我们可以知道实际的异步处理是调用的 asyncWebRequest.startAsync() 方法,而 StandardServletAsyncWebRequest 就是 asyncWebRequest 的子类。(那么这个对象是什么时候创建出来的呢?其实是在 RequestMappingHandlerAdapter 中的 invokeHandlerMethod 方法通过调用 WebAsyncUtils.createAsyncWebRequest(request, response) 方法创建的。) 我们继续看一下它的源码:

    @Override
    public void startAsync() {
        // 注意必须开启 async 的支持
        Assert.state(getRequest().isAsyncSupported(),
                "Async support must be enabled on a servlet and for all filters involved " +
                "in async request processing. This is done in Java code using the Servlet API " +
                "or by adding \"<async-supported>true</async-supported>\" to servlet and " +
                "filter declarations in web.xml.");
        // 查看当前异步任务的状态,为什么任务还没有开始就会被完成呢?大家如果思考一下就会发现,其实在执行任务之前,有些拦截器就已经开始工作了,它们可能就会修改异步结果 
        Assert.state(!isAsyncComplete(), "Async processing has already completed");
        // 如果异步已经开始了,直接返回
        if (isAsyncStarted()) {
            return;
        }
        // 获取实际的 Request (RequestFacade) 对象,并调用其异步方法(只是标记当前状态为:STARTING)
        this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
        // 将先前组册的监听器绑定到实际的 Request 对象上
        this.asyncContext.addListener(this);
        // 设置超时时间
        if (this.timeout != null) {
            this.asyncContext.setTimeout(this.timeout);
        }
    }
    // 调用实际 Request 对象的 dispatch() 方法
    @Override
    public void dispatch() {
        Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
        this.asyncContext.dispatch();
    }

DeferredResult

如果想开启异步的支持,需要将 Controller 的返回值用 DeferredResult 进行封装,然后被返回值解析器解析。当调用 DeferredResult 的 setResult 方法的时候,响应会发送给客户端,完成整个请求。

    // 供使用者调用以设置响应结果 
    public boolean setResult(T result) {
        return setResultInternal(result);
    }

    private boolean setResultInternal(Object result) {
        // 判断请求是否过期
        if (isSetOrExpired()) {
            return false;
        }
        DeferredResultHandler resultHandlerToUse;
        synchronized (this) {
            // 这里运用到了锁的双重检测,判断当前请求的状态
            if (isSetOrExpired()) {
                return false;
            }
            // 将响应结果赋值给该对象
            this.result = result;
            // 该结果处理器就是在 WebAsyncManager 的 startDeferredResultProcessing() 方法中配置的
            // 赋值给本地变量,将对象的置空
            resultHandlerToUse = this.resultHandler;
            if (resultHandlerToUse == null) {
                return true;
            }
            this.resultHandler = null;
        }
        // 回调
        resultHandlerToUse.handleResult(result);
        return true;
    }

尾声

本文只是简单介绍了一下 SpringMVC 异步的相关知识,本来想在深层讲解一下 Servlet 异步的实现,但是发现内容实在太多了,放在本篇有点不合适,毕竟该系列是讲框架层的知识。所以打算另起一个专题,讲讲 Tomcat 的相关源码。

上一篇下一篇

猜你喜欢

热点阅读