java面试总结

源码分析Tomcat监听并接受Socket连接

2018-11-16  本文已影响55人  千本樱散落

关注微信公众号


源码分析Tomcat监听并接受Socket连接

tomcat建立Socket监听TCP连接,获取连接后委托SocketProcessor处理器处理后续流程

流程:

1、启动tomcat
2、启动tomcat完成后调用NioEndpoint的startInternal()方法初始化操作
3、createExecutor():初始化创建10个工作线程的线程池,此线程池最大只能创建200个线程,最小空闲工作线程为10
4、设置tomcat并发最大连接数为10000
5、创建并运行2个Poller守护线程,其作用是将Acceptor接收的连接请求委托给SocketProcessor处理器
6、startAcceptorThreads():创建一个Acceptor线程对象,用于监听网络TCP/IP连接,并将Acceptor线程对象作为一个守护线程启动,启动线程后就一直做死循环监听TCP连接
7、如果当前连接数大于10000,则等待其他连接释放,如果当前连接数小于10000,则继续后续操作
8、socket = serverSock.accept()等待TCP网络请求进来
9、socket = serverSock.accept();有返回值时,说明有一个TCP连接进来,然后执行setSocketOptions(),将当前连接注册到Poller中
10、getPoller0().register(channel);由于Poller中的两个线程一直在运行,所以当channel注册到Poller中时,Poller中的selector.select()就会得到当前连接
11、Poller.run()方法中,当selector.select()有返回值时,循环遍历每个值SelectionKey,然后调用processKey(SelectionKey,attachment)
12、processKey()方法中调用processSocket()方法,创建一个SocketProcessorBase线程对象处理器来处理连接请求
13、从工作线程池中获取一个工作线程来运行SocketProcessorBase线程对象,其具体实现在NioEndpoint的内部类SocketProcessor.doRun()

NioEndpoint源码分析:

tomcat容器启动成功后,会开启一个NIO端点,创建含有10个工作线程(http-nio-8080-exec-xxx)的线程池,最大并发连接数为10000,然后创建大小为2的Poller线程对象数组,并启动数组中的2个守护线程,然后启动一个Acceptor线程监听网络TCP/IP的连接请求,监听到请求后,注册到Poller线程对象中,Poller线程对象委托SocketProcessor处理请求

startInternal()源码对应上述流程第1-5步:

/**
 * Start the NIO endpoint, creating acceptor, poller threads.
 */
@Override
public void startInternal() throws Exception {
    if (!running) {
        //启动Acceptor线程时,会判断running和paused 状态
        running = true;
        paused = false;
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());
        // Create worker collection
        //创建10个工作线程
        if ( getExecutor() == null ) {
            createExecutor();
        }
        //初始化最大并发连接数为10000
        initializeConnectionLatch();
        // Start poller threads
        //创建有2个守护线程的poller数组,并且启动这2个守护线程
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        //启动Acceptor线程,监听网络TCP/IP的连接请求
        startAcceptorThreads();
    }

创建Acceptor线程:startAcceptorThreads() 对应上述流程第6步

protected final void startAcceptorThreads() {
    //创建一个线程数为1的Acceptor的数组
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];
    for (int i = 0; i < count; i++) {
        //创建Acceptor监听网络TCP/IP请求,具体实现在NioEndpoint中
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

Acceptor线程启动逻辑:对应上述流程第7-9步

/**
 * 是一个监听网络TCP/IP连接的后台线程,接收到连接后会交给合适的处理器(SocketProcessor/ConnectionHandler)处理,Acceptor并非直接将连接请求交给SocketProcessor,而是经过了Poller
 * The background thread that listens for incoming TCP/IP connections and
 * hands them off to an appropriate processor.
 */

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;
        // Loop until we receive a shutdown command
        // 直到接受到关闭连接的命令,否则一直循环下去
        while (running) {
            // Loop if endpoint is paused
            //如果处于暂定状态,则一直等到paused为false
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;
            try {
                //if we have reached max connections, wait
                // 如果还没有到达最大连接数,则当前连接数量做加一操作,
                // 如果到达最大连接数则让当前接收线程处于等待状态,直到有连接被释放
                countUpOrAwaitConnection();
                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    //从服务器套接字serverSock接收下一个进入的连接请求,如果没有连接请求,则一直阻塞,直到有连接到来才继续后续的处理
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    // We didn't get a socket
                    countDownConnection();
                    if (running) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;
                // Configure the socket
                if (running && !paused) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 将当前连接请求注册到Poller中
                    if (!setSocketOptions(socket)) {
                        //关闭socket,并将连接数-1
                        closeSocket(socket);
                    }
                } else {
                    //关闭socket,并将连接数-1
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
 
    private void closeSocket(SocketChannel socket) {
        countDownConnection();
        try {
            socket.socket().close();
        } catch (IOException ioe)  {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.err.close"), ioe);
            }
        }
        try {
            socket.close();
        } catch (IOException ioe) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.err.close"), ioe);
            }
        }
    }
}

将TCP连接请求注册到Poller线程对象中,对应上述流程第10步

/**
* 处理指定的连接套接字
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
*  and processing may continue, <code>false</code> if the socket needs to be
*  close immediately
*/
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
           if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        //从Poller中获取一个线程来处理新接收到的请求连接,委托其完成相应处理,委托完成后,当前线程继续自己被设定的监听接收委托任务
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

Poller中定义有一个Selector(选择器),它是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件,一个单独的线程可以管理多个channel,从而管理多个网络连接。Poller线程开启后一直处于运行状态,待Poller中的selector.select()有返回值时,继续处理后续流程,否则一直阻塞,对应上述流程第11-12步

@Override
public void run() {
    // Loop until destroy() is called
    while (true) {
        boolean hasEvents = false;
        try {
            if (!close) {
                hasEvents = events();
                //获取selector中的连接数量,只有keyCount大于0时,才处理后续流程,否则一直阻塞
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());
        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                processKey(sk, attachment);
            }
        }//while
        //process timeouts
        timeout(keyCount,hasEvents);
    }//while
    getStopLatch().countDown();
}


protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // Read goes before write
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

创建SocketProcessorBase来处理请求NioSocketWrapper,并从工作线程池中获取一个工作线程来处理SocketProcessorBase,对应上述第13步

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

线程分析

项目启动时,会有一个主线程main来启动tomcat容器,并且初始化一些操作(开启10个工作线程、创建并运行2个Poller线程、开启监听TCP/IP连接请求),等到执行到Acceptor中的socket = serverSock.accept()(监听TCP/IP连接请求)时,main主线程就结束了


image.png

下图中的http-nio-8080-exec-1至http-nio-8080-exec-10即是startInternal开启的10个工作线程,当前并没有任务交给工作线程,所以是WAIT状态;http-nio-8080-ClientPoller-0和http-nio-8080-ClientPoller-1是Poller的2个创建完成后就启动的守护线程,状态是RUNNING;http-nio-8080-Aceeptor-0线程是等到有网络TCP/IP连接进来后,才创建的,创建完成后立即启动,状态为RUNNING;


image.png

当Acceptor监听到有请求进来后,首先会启动http-nio-8080-Accptor-0线程,此线程会一直处于运行状态,监听网络连接请求;然后会从10个工作线程中获取一个线程(http-nio-8080-exec-1)来执行刚刚进来的请求


image.png

直到请求结束后,会将http-nio-8080-exec-1放回线程池,此时http-nio-8080-exec-1的状态是WAIT


image.png
上一篇 下一篇

猜你喜欢

热点阅读