Tomcat

Tomcat 接收连接的accept流程

2018-11-29  本文已影响63人  晴天哥_王志

开篇

 这篇文章的主要目的是分析下Tomcat在处理连接请求的整个过程,参考了前人的文章并在文末指出,通过时序图能够较清楚的走通整个流程。

Tomcat处理流程

Tomcat处理流程

说明:

Connector 启动以后会启动一组线程用于不同阶段的请求处理过程,Acceptor、Poller、worker 所在的线程组都维护在 NioEndpoint 中。

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public void startInternal() throws Exception {
            // 创建worker线程组
            if ( getExecutor() == null ) {
                createExecutor();
            }

            // Poller线程组由一堆线程组成
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }
}

public abstract class AbstractEndpoint<S> {
    // Acceptor线程组由一堆线程组成
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

    // worker的线程组由executor创建线程池组成
    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
}

请求处理过程分解

Acceptor接收连接过程

Acceptor.jpg

说明:
Acceptor接受的新连接没有立即注册到selector当中,需要先封装成PollerEvent对象后保存至PollerEvent队列当中,Poller对象会消费PollerEvent队列,类似生产消费模型。

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    private volatile ServerSocketChannel serverSock = null;
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        public void run() {

            while (running) {
                state = AcceptorState.RUNNING;
                try {
                    SocketChannel socket = null;
                    try {
                        // 监听socket负责接收新连接
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                    }

                    if (running && !paused) {
                        // 处理接受到的socket对象
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } 
                } catch (Throwable t) {
                }
            }
            state = AcceptorState.ENDED;
        }
    }


    protected boolean setSocketOptions(SocketChannel socket) {
        try {
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            
            channel = new NioChannel(socket, bufhandler);
            // 注册到Poller当中
            getPoller0().register(channel);
        } catch (Throwable t) {
        }

        return true;
    }

    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }


    public class Poller implements Runnable {
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            r = new PollerEvent(socket,ka,OP_REGISTER);
            
            // 添加PollerEvent队列当中
            addEvent(r);
        }


        private void addEvent(PollerEvent event) {
            // 投入到PollerEvent队列当中
            events.offer(event);
            if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
    }

}

Poller处理请求

Poller.jpg

说明:
Poller会消费PollerEvent队列(由Acceptor进行投递),并注册到Selector当中。当注册到Selector的socket数据可读的时候将socket封装成SocketProcessor对象,投递到Executor实现的线程池进行处理。

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    public static class PollerEvent implements Runnable {
        private NioChannel socket;
        private int interestOps;
        private NioSocketWrapper socketWrapper;

        public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
            reset(ch, w, intOps);
        }

        public void run() {
            if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(
                            socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {

                }
            }
        }
    }


    public class Poller implements Runnable {

        public void run() {
            while (true) {
                // events()负责处理PollerEvent事件并注册到selector当中
                hasEvents = events();
                keyCount = selector.select(selectorTimeout);

                // 处理新接受的socket的读写事件
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();

                    processKey(sk, attachment);
                }
            }
        }

        // 处理读写事件
        protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            if (sk.isReadable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                    closeSocket = true;
                }
             }
                            
            if (!closeSocket && sk.isWritable()) {
                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                    closeSocket = true;
                }
            }
        }
    }
}


public abstract class AbstractEndpoint<S> {
    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {

        try {
            sc = createSocketProcessor(socketWrapper, event);
            Executor executor = getExecutor();
            // 注册到Worker的线程池ThreadPoolExecutor。
            if (dispatch && executor != null) {
                executor.execute(sc);
            } 
        } catch (RejectedExecutionException ree) {
        } 

        return true;
    }
}

Worker处理具体请求

Worker.jpg

说明:

Container单个请求处理流程

StandardEngineValve

说明:

参考文章

谈谈 Tomcat 请求处理流程

招聘信息

【招贤纳士】

欢迎热爱技术、热爱生活的你和我成为同事,和贝贝共同成长。

贝贝集团诚招算法、大数据、BI、Java、PHP、android、iOS、测试、运维、DBA等人才,有意可投递zhi.wang@beibei.com

贝贝集团创建于2011年,旗下拥有贝贝网、贝店、贝贷等平台,致力于成为全球领先的家庭消费平台。

贝贝创始团队来自阿里巴巴,先后获得IDG资本、高榕资本、今日资本、新天域资本、北极光等数亿美金的风险投资。

公司地址:杭州市江干区普盛巷9号东谷创业园(上下班有多趟班车)

上一篇 下一篇

猜你喜欢

热点阅读