Tomcat请求处理分析(一) - Processor
在分析了Tomcat的启动过程和各个组件后,本文开始分析Tomcat是如何处理请求的。让我们回到Tomcat启动分析(六) - Acceptor与Poller这篇文章的末尾,在AbstractEndPoint类的processSocket方法中,工作线程池执行的任务是一个SocketProcessorBase,本文从SocketProcessorBase开始分析。
SocketProcessorBase
SocketProcessorBase是一个实现了Runnable接口的抽象类,run方法调用了doRun抽象方法:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
NioEndPoint的createSocketProcessor函数返回的SocketProcessor是NioEndPoint类的内部类,继承了SocketProcessorBase抽象类,实现了doRun方法。该类的注释说明SocketProcessor与Worker的作用等价。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}
- handshake相关的变量与函数都与SSL的握手有关,暂时忽略,NioChannel的handshake函数直接返回了0;
- 真正的处理过程在getHandler().process这处调用。
getHandler函数定义在AbstractEndPoint类中,相关代码如下:
/**
* Handling of accepted sockets.
*/
private Handler<S> handler = null;
public void setHandler(Handler<S> handler ) { this.handler = handler; }
public Handler<S> getHandler() { return handler; }
Handler是何时被赋值的呢?答案在Http11NioProtocol对象被创建的过程中,调用父类构造函数时会为端点的Handler赋值:
public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}
ConnectionHandler
ConnectionHandler是AbstractProtocol类的静态内部类,上文调用了该类的process函数,由于代码较多此处不再展示。由于能力有限,本文只分析最常见情景对应的代码:
- processor = getProtocol().createProcessor();
createProcessor是定义在AbstractProtocol中的抽象方法,AbstractHttp11Protocol实现了它:@Override protected Processor createProcessor() { Http11Processor processor = new Http11Processor(getMaxHttpHeaderSize(), getAllowHostHeaderMismatch(), getRejectIllegalHeaderName(), getEndpoint(), getMaxTrailerSize(), allowedTrailerHeaders, getMaxExtensionSize(), getMaxSwallowSize(), httpUpgradeProtocols, getSendReasonPhrase(), relaxedPathChars, relaxedQueryChars); processor.setAdapter(getAdapter()); processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests()); processor.setConnectionUploadTimeout(getConnectionUploadTimeout()); processor.setDisableUploadTimeout(getDisableUploadTimeout()); processor.setCompressionMinSize(getCompressionMinSize()); processor.setCompression(getCompression()); processor.setNoCompressionUserAgents(getNoCompressionUserAgents()); processor.setCompressibleMimeTypes(getCompressibleMimeTypes()); processor.setRestrictedUserAgents(getRestrictedUserAgents()); processor.setMaxSavePostSize(getMaxSavePostSize()); processor.setServer(getServer()); processor.setServerRemoveAppProvidedValues(getServerRemoveAppProvidedValues()); return processor; }
- state = processor.process(wrapper, status);这一行委托给Processor的process函数继续处理请求。
Processor
Processor接口用来处理协议的请求,类层次结构如下图所示。
Processor类层次结构.png
Http11Processor类的process函数定义在其父类AbstractProcessorLight中,代码如下,它会接着调用service抽象方法处理请求。
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
// 省略一些代码
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
Http11Processor类实现了自己的service方法,由于代码较多此处不再展示,重要的处理流程是getAdapter().service(request, response);这一行。
Http11Processor的adapter是在对象生成后set上去的,其参数是调用AbstractProtocol的getAdapter方法得到的,而AbstractProtocol的adapter是Connector初始化的时候被赋值的,代码如下:
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// 省略一些代码
}
CoyoteAdapter的service方法如下:
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// Parse and set Catalina and configuration specific
// request parameters
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
// 省略一些代码
}
}
注意看connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)这一行:
- Connector调用getService返回StandardService;
- StandardService调用getContainer返回StandardEngine;
- StandardEngine调用getPipeline返回与其关联的StandardPipeline;
- 在Tomcat启动分析(七) - Container及相关组件中曾提到StandardEngine的构造函数为自己的Pipeline添加了基本阀StandardEngineValve,StandardPipeline调用getFirst得到第一个阀去处理请求,由于基本阀是最后一个,所以最后会由基本阀去处理请求,请看下一篇文章。