Tomcat源码分析2 之 Protocol实现分析
简介
本文继续以tomcat8 为例,简单分析下Tomcat的几种protocol的差异,以及处理链接的细节。
主要有以下内容:</br>
- tomcat protocol的分类</br>
- tomcat protocol的实现</br>
- 各个protocol的差异</br>
Tomcat protocol 配置
参考官方文档,tomcat protocol配置,可以看到protocol
主要是有四种,默认使用的HTTP/1.1 ,对于tomcat8 以更高版本来说,HTTP/1.1
的配置会默认使用nio来处理,也就是org.apache.coyote.http11.Http11NioProtocol
。
其他的几种:</br>
org.apache.coyote.http11.Http11Protocol
java的bio connector,使用ServerSocket
处理请求。</br>
org.apache.coyote.http11.Http11NioProtocol
java的nio connector,使用SocketChannel
处理请求.</br>
org.apache.coyote.http11.Http11Nio2Protocol
java7新出的aio connector,使用AsynchronousSocketChannel
处理请求。</br>
org.apache.coyote.http11.Http11Nio2Protocol
tomcat的native library connector。(这个我们稍后再讲)</br>
对于tomcat8 以更高版本来说,HTTP/1.1的配置会默认使用nio来处理,也就是
org.apache.coyote.http11.Http11NioProtocol
。由org.apache.catalina.connector.Connector
的setProtocol()
方法也可以看到:
public void setProtocol(String protocol) {
//如果配置了apr,会默认使用apr(apr后面再讲)
if (AprLifecycleListener.isAprAvailable()) {
if ("HTTP/1.1".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpAprProtocol");
} else if (protocol != null) {
setProtocolHandlerClassName(protocol);
} else {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
}
} else {
//这里可以看到,HTTP/1.1是server.xml的默认配置,会默认使用nio处理
if ("HTTP/1.1".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11NioProtocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpNioProtocol");
} else if (protocol != null) {
//其他情况下,使用指定的protocol
setProtocolHandlerClassName(protocol);
}
}
}
Tomcat protocol的处理流程
在Connector
的startInternal()
方法会启动protocol
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
...
try {
protocolHandler.start();
} catch (Exception e) {
...
}
}
这里的protocolHandler,就是上面setProtocol()方法指定的protocol。暂时以Http11NioProtocol
为例,分析下请求处理流程。上面startInternal()
方法接下来会到org.apache.coyote.AbstractProtocol
的start()
方法。
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled())
getLog().info(sm.getString("abstractProtocolHandler.start",
getName()));
try {
//每个protocol都有一个对应的enpoint,最终是由endpoint来负责处理链接的。
endpoint.start();
} catch (Exception ex) {
getLog().error(sm.getString("abstractProtocolHandler.startError",
getName()), ex);
throw ex;
}
}
每个protocol对应了一个Endpoint</br>
Http11Protocol
对应org.apache.tomcat.util.net.JIoEndpoint
</br>
Http11NioProtocol
对应org.apache.tomcat.util.net.NioEndpoint
</br>
Http11Nio2Protocol
对应org.apache.tomcat.util.net.Nio2Endpoint
</br>
继续查看Nio2Endpoint
的startInternal()
方法.
@Override
public void startInternal() throws Exception {
if (!running) {
...
//初始化最大连接数限制,在server.xml中可配置
initializeConnectionLatch();
// Start poller threads
// poller 主要负责检查各个 Selector 的状态以及处理超时等
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//启动接受链接的线程
startAcceptorThreads();
}
}
startAcceptorThreads()
方法会启动Endpoint
内部的Acceptor
。继续查看org.apache.tomcat.util.net.NioEndpoint.Acceptor
的run()
方法:
@Override
public void run() {
...
// Loop until we receive a shutdown command
while (running) {
...
try {
//检查下当前是否已经达到了最大链接数
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server socket
socket = serverSock.accept();
} catch (IOException ioe) {
...
}
// Successful accept, reset the error delay
errorDelay = 0;
// setSocketOptions() will add channel to the poller if successful
// setSocketOptions 会把channel添加到poller
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} ...
}
state = AcceptorState.ENDED;
}
}
从代码中可以看到,使用 SocketChannel 接受连接,然后通过 setSocketOptions 方法把 channel 交给 poller 处理。setSocketOptions(SocketChannel socket)
方法则是真正处理链接的地方。
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
...
//会把 调用poller的 register 方法,把 channel 交给 poller
getPoller0().register(channel);
} catch (Throwable t) {
...
}
return true;
}
getPoller0() 会根据设置的poll线程数,返回一个当前可用的 Poller 对象,使用Round robin算法实现一个简单的负载均衡。
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
接下来查看 register()
方法
public void register(final NioChannel socket) {
socket.setPoller(this);
// 新建一个 KeyAttachment 对象,保存跟socket相关的一些信息
KeyAttachment ka = new KeyAttachment(socket);
ka.setPoller(this);
ka.setTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
//比较重要的是这句,把 r 添加到当前的 events 队列中。
addEvent(r);
}
上面可以看到,每个请求进来之后,会通过 register 方法创建一个 PollerEvent 对象并添加到当前的 SynchronizedQueue<PollerEvent> events 队列中。
接下来看 Poller的run()
方法,上面已经说过,Poller 在startInternal()
方法调用的时候创建并启动。
@Override
public void run() {
// Loop until destroy() is called
while (true) {
try {
...
boolean hasEvents = false;
// Time to terminate?
if (close) {
events();
timeout(0, false);
...
break;
} else {
//调用 events() 方法
hasEvents = events();
}
}
...
}//while
stopLatch.countDown();
}
接下来查看events()方法
public boolean events() {
boolean result = false;
PollerEvent pe = null;
while ( (pe = events.poll()) != null ) {
result = true;
try {
//调用 PollerEvent 的 run() 方法
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
可以看到,在tomcat启动之后,会启动相应的endpoint的Acceptor来接受请求,同时启动相应的Poller来处理请求。</br>
PollerEvent的run方法
@Override
public void run() {
//对于新增的链接,会注册给 poller 的 selector 处理。
if ( interestOps == OP_REGISTER ) {
try {
//把当前的key注册给poller中的selector对象,准备后续处理。
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
...
}//end if
}//run
再回过来看 Poller 的 run() 方法。里面有一段
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);
}
}//while
会检查selector
所有的key,然后处理调用processKey()
进行处理。
processKey()
会根据配置的Executer
调用SocketProcessor
的 run()
方法.
接下来看SocketProcessor
的run()
方法最终会调用doRun()
方法,在doRun()
里有如下代码:
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
接下来会调用到handler
的process
方法,这个handler就是在Http11NioProtocol
的构造方法里由setHandler
设置的handler,也就是Http11ConnectionHandler
继承了 AbstractConnectionHandler
,接着来到 org.apache.coyote.AbstractProtocol.AbstractConnectionHandler
的 process
方法,这里会对每个socket做处理。包括调用具体的servlet处理业务等等。
总结
tomcat在启动的时候,根据配置的protocol,启动不同的Endpoint中的Acceptor 和 Poller。Acceptor负责接受请求,Poller负责调用线程池执行业务。