tomcat源码分析
2018-12-14 本文已影响0人
简书徐小耳
bootstrap的main方法就是tomcat的启动类,其中就是daemon 就是类Bootstrap
public static void main(String args[]) {
synchronized (daemonLock) {
if (daemon == null) {
首先new一个对象,该对象会触发Bootstrap的类初始化(即执行static代码块和静态变量)
不过这个main方法就是在Bootstrap类中执行的所以会在main方法执行前执行初始化操作
设置catalina.home和catalina.base的文件目录path
Bootstrap bootstrap = new Bootstrap();
try {
执行初始化方法,初始化三个类加载器,并实例化Catalina
bootstrap.init();
} catch (Throwable t) {
handleThrowable(t);
t.printStackTrace();
return;
}
daemon = bootstrap;
} else {
设置线程上下文
Thread.currentThread().setContextClassLoader(daemon.catalinaLoader);
}
}
try {
一般启动额时候我们会执行start
String command = "start";
if (args.length > 0) {
command = args[args.length - 1];
}
if (command.equals("startd")) {
args[args.length - 1] = "start";
daemon.load(args);
daemon.start();
} else if (command.equals("stopd")) {
args[args.length - 1] = "stop";
daemon.stop();
} else if (command.equals("start")) {
这个await标识后期是用来辅助tomcat进行退出的,即设置catalina类的await
daemon.setAwait(true);
最终还是调用catalina类的的load方法
daemon.load(args);
最终还是调用catalina类的的start方法
daemon.start();
if (null == daemon.getServer()) {
System.exit(1);
}
} else if (command.equals("stop")) {
daemon.stopServer(args);
} else if (command.equals("configtest")) {
daemon.load(args);
if (null == daemon.getServer()) {
System.exit(1);
}
System.exit(0);
} else {
log.warn("Bootstrap: command \"" + command + "\" does not exist.");
}
} catch (Throwable t) {
// Unwrap the Exception for clearer error reporting
if (t instanceof InvocationTargetException &&
t.getCause() != null) {
t = t.getCause();
}
handleThrowable(t);
t.printStackTrace();
System.exit(1);
}
}
Bootstrap的初始化操作 就是设置catalina.home和catalina.base的文件目录path
static {
获取当前项目的位置
String userDir = System.getProperty("user.dir");
获取我们home文件夹
String home = System.getProperty(Globals.CATALINA_HOME_PROP);
File homeFile = null;
todo getCanonicalPath会将文件路径解析为与操作系统相关的唯一的规范形式的字符串,而getAbsolutePath并不会
if (home != null) {
创建这个文件
File f = new File(home);
try {
homeFile = f.getCanonicalFile();
} catch (IOException ioe) {
homeFile = f.getAbsoluteFile();
}
}
如果homeFile ==null
if (homeFile == null) {
检查当前的目录是否是tomcat的bin目录
File bootstrapJar = new File(userDir, "bootstrap.jar");
如果是的话则尝试获取bin的上一级目录(即与home在同一个目录)作为home的文件
if (bootstrapJar.exists()) {
File f = new File(userDir, "..");
try {
homeFile = f.getCanonicalFile();
} catch (IOException ioe) {
homeFile = f.getAbsoluteFile();
}
}
}
如果homeFile 还是为null
if (homeFile == null) {
// Second fall-back. Use current directory
我们采用当前目录作为homeFile
File f = new File(userDir);
try {
homeFile = f.getCanonicalFile();
} catch (IOException ioe) {
homeFile = f.getAbsoluteFile();
}
}
catalinaHomeFile = homeFile;
把catalian.home的目录设置为homeFile的path
System.setProperty(
Globals.CATALINA_HOME_PROP, catalinaHomeFile.getPath());
获取catalian.base 如果没有就使用catalinaHomeFile
String base = System.getProperty(Globals.CATALINA_BASE_PROP);
if (base == null) {
catalinaBaseFile = catalinaHomeFile;
} else {
File baseFile = new File(base);
try {
baseFile = baseFile.getCanonicalFile();
} catch (IOException ioe) {
baseFile = baseFile.getAbsoluteFile();
}
catalinaBaseFile = baseFile;
}
设置catalian.base
System.setProperty(
Globals.CATALINA_BASE_PROP, catalinaBaseFile.getPath());
}
bootstrap的init方法
public void init() throws Exception {
初始化commonclassloader,catalinaLoader,sharedclasslaoder
initClassLoaders();
设置catalinaLoader线程上下文的classloader,一般情况下catalinaLoader会加载class专门给tomcat用
但是这边catalinaLoader与commonclassloader和sharedclasslaoder一样
Thread.currentThread().setContextClassLoader(catalinaLoader);
预加载一些class 避免AccessControlException
SecurityClassLoad.securityClassLoad(catalinaLoader);
获取Catalina类的实例并调用setParentLoader方法
if (log.isDebugEnabled())
log.debug("Loading startup class");
不会触发类的初始化
Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
Object startupInstance = startupClass.getConstructor().newInstance();
// Set the shared extensions class loader
准备调用Catalina的setParentClassLoader,即设置sharedLoader为catalina的parentClassLoader
if (log.isDebugEnabled())
log.debug("Setting startup class properties");
String methodName = "setParentClassLoader";
Class<?> paramTypes[] = new Class[1];
paramTypes[0] = Class.forName("java.lang.ClassLoader");
Object paramValues[] = new Object[1];
paramValues[0] = sharedLoader;
Method method =
startupInstance.getClass().getMethod(methodName, paramTypes);
method.invoke(startupInstance, paramValues);
catalinaDaemon = startupInstance;
}
catalina的load方法
public void load() {
loaded代表是否加载过
if (loaded) {
return;
}
loaded = true;
long t1 = System.nanoTime();
获取并创建temp目录
initDirs();
// Before digester - it may be needed
//设置naming有可能是的JNI
initNaming();
创建并执行我们的Digester,主要是解析我们的xml
Digester digester = createStartDigester();
InputSource inputSource = null;
InputStream inputStream = null;
File file = null;
try {
try {
创建server.xml的file
file = configFile();
inputStream = new FileInputStream(file);
即将该url转化成唯一标识
inputSource = new InputSource(file.toURI().toURL().toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail", file), e);
}
}
if (inputStream == null) {
try {
如果是未获取到inputStream 我们再以相对路径的方式去获取
inputStream = getClass().getClassLoader()
.getResourceAsStream(getConfigFile());
inputSource = new InputSource
(getClass().getClassLoader()
.getResource(getConfigFile()).toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail",
getConfigFile()), e);
}
}
}
// This should be included in catalina.jar
// Alternative: don't bother with xml, just create it manually.
如果还没有xml 那么这个时候我们去寻找内置的xml
if (inputStream == null) {
try {
inputStream = getClass().getClassLoader()
.getResourceAsStream("server-embed.xml");
inputSource = new InputSource
(getClass().getClassLoader()
.getResource("server-embed.xml").toString());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("catalina.configFail",
"server-embed.xml"), e);
}
}
}
如果inputStream 和inputSource 都没有 那么 则打印日志直接返回
if (inputStream == null || inputSource == null) {
if (file == null) {
log.warn(sm.getString("catalina.configFail",
getConfigFile() + "] or [server-embed.xml]"));
} else {
log.warn(sm.getString("catalina.configFail",
file.getAbsolutePath()));
if (file.exists() && !file.canRead()) {
log.warn("Permissions incorrect, read permission is not allowed on the file.");
}
}
return;
}
准备 让digester 按照规则解析xml
try {
inputSource.setByteStream(inputStream);
digester.push(this);
digester.parse(inputSource);
} catch (SAXParseException spe) {
log.warn("Catalina.start using " + getConfigFile() + ": " +
spe.getMessage());
return;
} catch (Exception e) {
log.warn("Catalina.start using " + getConfigFile() + ": " , e);
return;
}
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
// Ignore
}
}
}
设置server的catalina属性,其中server在解析xml的时候已经创建了
getServer().setCatalina(this);
设置catalinahome和catalinabase的属性
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
// Stream redirection
SystemLogHandler调用startCapture 会使用caputrelog里面的printStream,该
stream不会自动flush 且因为是线程独有的 所以不会被阻塞,当结束后调用stopCapture 把之前使用
System.out的打印都返回 采用其他方式一次性打印出来
initStreams();
// Start the new server
try {
初始化server,注意这边的init是LifecycleMBeanBase的init方法
getServer().init();
} catch (LifecycleException e) {
if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
throw new java.lang.Error(e);
} else {
log.error("Catalina.start", e);
}
}
long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info("Initialization processed in " + ((t2 - t1) / 1000000) + " ms");
}
}
Lifecycle的init
public final synchronized void init() throws LifecycleException {
if (!state.equals(LifecycleState.NEW)) {
invalidTransition(Lifecycle.BEFORE_INIT_EVENT);
}
try {
设置state为INITIALIZING同时触发对应的监听事件
setStateInternal(LifecycleState.INITIALIZING, null, false);
初始化真正的逻辑
initInternal();
设置state为INITIALIZED同时触发对应的监听事件
setStateInternal(LifecycleState.INITIALIZED, null, false);
} catch (Throwable t) {
handleSubClassException(t, "lifecycleBase.initFail", toString());
}
}
standServer的initInternal
protected void initInternal() throws LifecycleException {
这边生成MBeanServer,即方便后期使用JMX查看tomcat的一些信息
ObjectName 是MBean在MBeanServer中的唯一标示
super.initInternal();
注册一个StringCache的Mbean
onameStringCache = register(new StringCache(), "type=StringCache");
注册一个MBeanFactory
MBeanFactory factory = new MBeanFactory();
factory.setContainer(this);
onameMBeanFactory = register(factory, "type=MBeanFactory");
globalNamingResources 执行初始化 跟standServer相似只是initInternal不太一样,
其内部主要是注册一些Mbean到自己的MBeanServer中
globalNamingResources.init();
// Populate the extension validator with JARs from common and shared
从我们shared(和shared的父类common,这边去除了系统类加载器)的classloader加载的jar包中检测是否有mainfest文件 如果有包装成对象加入集合
if (getCatalina() != null) {
我们在bootstrap的init的中设置了catalina的ParentClassLoader是shareclassloader
ClassLoader cl = getCatalina().getParentClassLoader();
// Walk the class loader hierarchy. Stop at the system class loader.
// This will add the shared (if present) and common class loaders
while (cl != null && cl != ClassLoader.getSystemClassLoader()) {
if (cl instanceof URLClassLoader) {
URL[] urls = ((URLClassLoader) cl).getURLs();
for (URL url : urls) {
if (url.getProtocol().equals("file")) {
try {
File f = new File (url.toURI());
if (f.isFile() &&
f.getName().endsWith(".jar")) {
ExtensionValidator.addSystemResource(f);
}
} catch (URISyntaxException e) {
// Ignore
} catch (IOException e) {
// Ignore
}
}
}
}
cl = cl.getParent();
}
}
我们的services内容在解析xml的时候就放入进行,即是包含connector和container(enigne)
for (int i = 0; i < services.length; i++) {
这边是Service的初始化话
services[i].init();
}
}
JMX的相关方法
@Override
protected void initInternal() throws LifecycleException {
// If oname is not null then registration has already happened via
// preRegister().
if (oname == null) {
mserver = Registry.getRegistry(null, null).getMBeanServer();
oname = register(this, getObjectNameKeyProperties());
}
}
standService的init 初始化connector和engine
protected void initInternal() throws LifecycleException {
JMX的相关操作
super.initInternal();
container 容器的初始化,注意这边只是初始化了顶级容器engine ,子容器还未初始化
if (engine != null) {
初始化Realm 如果没有就设置一个NullRealm,默认创建LockOutRealm
阻止别人暴力破解user密码,其设置密码错误锁定时长等,内部包含UserDatabaseRealm
其是存储用户登录的权限认定
engine.init();
}
初始化线程池 我们的tomcat的 connector 可以配置这个属性
for (Executor executor : findExecutors()) {
if (executor instanceof JmxEnabled) {
((JmxEnabled) executor).setDomain(getDomain());
}
这个init只是注册一个MbeanServer和Mbean
executor.init();
}
这个init只是注册一个MbeanServer和Mbean
mapperListener.init();
我们在上面已经初始化好了container,那么这边我就初始化connector
synchronized (connectorsLock) {
for (Connector connector : connectors) {
connector.init();
}
}
}
Connector的初始化
protected void initInternal() throws LifecycleException {
JMX相关
super.initInternal();
protocol 是在server.xml中配置在了connector里面的元素
if (protocolHandler == null) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
}
// Initialize adapter
CoyoteAdapter 可主要是生成HttpServletRequest和HttpServletResponse
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// Make sure parseBodyMethodsSet has a default
设置一个parseBodyMethodsSet (解析请求体的方法),默认是POST
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}
如果是APR协议但是APR监听者却不可用则抛出异常
if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
如果APR监听者可用且需要设置SSL且协议是Http11JsseProtocol 则增加SSL
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceof AbstractHttp11JsseProtocol) {
AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
(AbstractHttp11JsseProtocol<?>) protocolHandler;
if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is available
jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
}
}
try {
协议的初始化
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}
protocolHandler(协议)的初始化
@Override
public void init() throws Exception {
先升级协议在初始化
for (UpgradeProtocol upgradeProtocol : upgradeProtocols) {
configureUpgradeProtocol(upgradeProtocol);
}
真正初始化
super.init();
}
public void init() throws Exception {
打印port的offset
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
logPortOffset();
}
如果ObjectName 为null 则创建一个 并注册到对应的MBeanServer
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname, null);
}
}
给domain也创建对应的MBean
if (this.domain != null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}
domain 只是做为JMX注册使用
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
首先选择对应的bind,创建对应的监听端口的socekt,然后选择一个shared的selector,设置pollerThreadCount(对应netty的worker专门去处理一系列socket的事件),acceptorThreadCount(对应netty的boss)
endpoint.init();
}
catalina的start方法
public void start() {
如果server不存在 重新调用load
if (getServer() == null) {
load();
}
如果还是为空直接返回
if (getServer() == null) {
log.fatal("Cannot start server. Server instance is not configured.");
return;
}
long t1 = System.nanoTime();
启动server,观察者模式会启动所有的子类
try {
getServer().start();
} catch (LifecycleException e) {
log.fatal(sm.getString("catalina.serverStartFail"), e);
try {
getServer().destroy();
} catch (LifecycleException e1) {
log.debug("destroy() failed for failed Server ", e1);
}
return;
}
long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
}
注册关闭时候的钩子,可以设置我们想注册的钩子
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
// If JULI is being used, disable JULI's shutdown hook since
// shutdown hooks run in parallel and log messages may be lost
// if JULI's hook completes before the CatalinaShutdownHook()
LogManager logManager = LogManager.getLogManager();
if (logManager instanceof ClassLoaderLogManager) {
((ClassLoaderLogManager) logManager).setUseShutdownHook(
false);
}
}
这边就是让tomcat进程不关闭
if (await) {
如果是-2 说明不需要启动 直接关闭
这边根据我们的端口的开放选择不同的关闭方式,如果端口是-1
代表是内置的tomcat,这个时候无法通过socket监听,只能通过线程不停的轮询一个标识
如果不是-1 则可以开启一个socekt 监听命令 如果是关闭 就跳出循环 执行stop
await();
关闭服务,如果存在钩子 先删除钩子 在关闭,防止重复关闭
stop();
}
}
server的start方法
protected void startInternal() throws LifecycleException {
触发监听事件和设置state
fireLifecycleEvent(CONFIGURE_START_EVENT, null);
setState(LifecycleState.STARTING);
启动globalNamingResources
这边 触发监听事件和设置state
globalNamingResources.start();
启动server
synchronized (servicesLock) {
for (int i = 0; i < services.length; i++) {
services[i].start();
}
}
}
service的start方法
protected void startInternal() throws LifecycleException {
设置state
if(log.isInfoEnabled())
log.info(sm.getString("standardService.start.name", this.name));
setState(LifecycleState.STARTING);
同步启动engine
// Start our defined Container first
if (engine != null) {
synchronized (engine) {
engine.start();
}
}
同步启动executor
synchronized (executors) {
for (Executor executor: executors) {
executor.start();
}
}
启动mapperListener
mapperListener.start();
启动非失败的connector
synchronized (connectorsLock) {
for (Connector connector: connectors) {
// If it has already failed, don't try and start it
if (connector.getState() != LifecycleState.FAILED) {
connector.start();
}
}
}
}
engine的start,即调用ContainerBase的startInternal
protected synchronized void startInternal() throws LifecycleException {
// Start our subordinate components, if any
logger = null;
getLogger();
Cluster cluster = getClusterInternal();
if (cluster instanceof Lifecycle) {
((Lifecycle) cluster).start();
}
Realm realm = getRealmInternal();
if (realm instanceof Lifecycle) {
((Lifecycle) realm).start();
}
// Start our child containers, if any
Container children[] = findChildren();
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < children.length; i++) {
results.add(startStopExecutor.submit(new StartChild(children[i])));
}
MultiThrowable multiThrowable = null;
for (Future<Void> result : results) {
try {
等待子类容器启动成功
result.get();
} catch (Throwable e) {
log.error(sm.getString("containerBase.threadedStartFailed"), e);
if (multiThrowable == null) {
multiThrowable = new MultiThrowable();
}
multiThrowable.add(e);
}
}
if (multiThrowable != null) {
throw new LifecycleException(sm.getString("containerBase.threadedStartFailed"),
multiThrowable.getThrowable());
}
四个基础阀门放在各自容器管道的最后
其他的valve 可以执行一些其他操作 比如打印日志之类的
if (pipeline instanceof Lifecycle) {
((Lifecycle) pipeline).start();
}
设置状态
setState(LifecycleState.STARTING);
// Start our thread
启动线程,主要是执行ContainerBackgroundProcessor,执行backGroundProcessor
threadStart();
}
···
>connector的start方法
```java
protected void startInternal() throws LifecycleException {
查看端口,小于0则抛出错误
if (getPortWithOffset() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
}
setState(LifecycleState.STARTING);
try {
启动协议
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
protocolHandler的start
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
logPortOffset();
}
启动endpoint,主要是绑定端口,并启动一些监听链接的请求
endpoint.start();
开启异步timeout的线程
检测超时的请求,并将该请求再转发到工作线程池处理
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
如果该endpoint还未绑定 则开始绑定,不过已经在endpoint的init的时候绑定了
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
endpoint的start方法
public void startInternal() throws Exception {
如果还未运行则开始运行
if (!running) {
running = true;
paused = false;
三个缓存用同步栈保存
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
创建真正执行请求的线程
if ( getExecutor() == null ) {
createExecutor();
}
初始化最大请求数,可以在server.xml配置
initializeConnectionLatch();
开启pollers线程 我们通过acceptor线程 把socket注册到poller上面
poller 内部持有一个selector 轮询发送的事件然后将事件交给executor处理
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
创建acceptor 该线程遇到请求tomcat的socket 将其注册到poller上
startAcceptorThreads();
}
}
-
通过下图可知acceptor 单线程即可
image.png
accept线程启动和run方法
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
获取acceptor的数量,创建并启动
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
- acceptor的run方法
public void run() {
int errorDelay = 0;
一直循环除非我们接收到关闭命令
while (endpoint.isRunning()) {
如果可以暂停且是正在运行就沉睡一会
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
如果不在运行了 直接退出
if (!endpoint.isRunning()) {
break;
}
设置为Running
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
检测下 如果我们的connection已经达到最大值就等待,一般到这一步说我们已经准备去获取一个connection
从这也可以看出一个aceeptor会占着一个connection,那么如果我们启动太多的acceptor 也不会太好
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
有可能在等待connection的时候 已经暂停了 所以需要检查下
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
获取请求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
如果发生异常就释放connect
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
这个标识是当发生异常的时候让线程sleep的
errorDelay = 0;
配置socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
setSocketOptions方法 会将socket送给一个合适的processor
if (!endpoint.setSocketOptions(socket)) {
如果失败了就关闭socket
endpoint.closeSocket(socket);
}
} else {
如果endpoint不是isRunning或者isPaused 则直接关闭socket
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
设置socket
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
设置该channel为非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
给socket设置一些属性比如缓冲区大小,tcpNoDelay,soTimeOut
socketProperties.setProperties(sock);
从同步栈中获取NioChannel的对象,其实数组
NioChannel channel = nioChannels.pop();
如果为空则根据条件创建NioChannel
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
如果已经存在对象 则设置下socket并清除buf
channel.setIOChannel(socket);
channel.reset();
}
将该channel注册到poller线程,由他去轮训该socket的事件
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
public void register(final NioChannel socket) {
将NioChannel 和poller绑定
socket.setPoller(this);
把NioChannel NioEndpoint和poller包装成对象 NioSocketWrapper
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
绑定NioChannel 和NioSocketWrapper
socket.setSocketWrapper(ka);
ka.setPoller(this);
设置后期该channel的timeout和keepalive ssl
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
从缓存获取PollerEvent ,PollerEvent 是一个runnable
PollerEvent r = eventCache.pop();
代表对read事件感兴趣
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
如果PollerEvent为空 就new一个
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
添加event
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
如果wakeupCounter=-1 则唤醒selector
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
poller的run方法
public void run() {
while (true) {
boolean hasEvents = false;
try {
如果未close
if (!close) {
检测我们之前注册的event,然后调用所有event的run方法该方法会根据注册到selector上或者
检测我们selectedKey是否过期 以及注册该channel感兴趣的事情
调用完了就清除PollerEvent的属性
hasEvents = events();
如果wakeupCounter大于0,说明有event加入进来
if (wakeupCounter.getAndSet(-1) > 0) {
非阻塞查询
keyCount = selector.selectNow();
} else {
阻塞查询查询
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
如果关闭
if (close) {
events();
清理selector和key
timeout(0, false);
try {
关闭selector
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
我们超时或者醒来 我们先处理events
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
遍历可以读取的keys 分派任何的激活的的event
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
处理事件的入口
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
poller 释放
getStopLatch().countDown();
}
pollerEvent的run方法
public void run() {
如果是注册事件 先注册
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
否则我们去selector寻找一个SelectionKey
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
如果Key==null 我们就释放该请求
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
注册感兴趣的事
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
处理请求的地方
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
如果close 就cancel
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
如果可读或者可写就继续处理 否则cancelledKey
if (sk.isReadable() || sk.isWritable() ) {
如果是文件上传 调用processSendfile
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
取消注册的事件
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
下面就是先读后写
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
dispatch是否启用线程,event代表是读写事件,socketWrapper包含了socket 等信息
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
从缓存获取Processor
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
获取线程池
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
最终是通过这个run方法进行执行
SocketProcessorBase的run方法
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
是否需要握手
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
开始握手
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
handshake =0 才能出来event
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
他就是处理的地方,最终调用了CoyoteAdapter的service方法进行处理
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}