Tomcat源码-Poller和Acceptor简单分析

2021-10-08  本文已影响0人  Wannay

研究Tomcat的网络部分,我们主要关注的点在于NioEndpoint这个组件的startInternal方法,这个方法中的主要代码如下

    createExecutor();   //创建Worker线程池,最小工作线程数量为10,最大工作线程数量为200

    poller = new Poller();    //创建poller线程
    Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
    pollerThread.setPriority(threadPriority);
    pollerThread.setDaemon(true);
    pollerThread.start();
    startAcceptorThread();  

///////////////////startAcceptorThread//////////////////////////////
    acceptor = new Acceptor<>(this);  //创建Acceptor线程
    String threadName = getName() + "-Acceptor";
    acceptor.setThreadName(threadName);
    Thread t = new Thread(acceptor, threadName);
    t.setPriority(getAcceptorThreadPriority());
    t.setDaemon(getDaemon());
    t.start();
////////////////////startAcceptorThread////////////////////////////

主要就是创建了两个线程,其中一个线程叫做Poller,另一个线程叫做Acceptor,创建完之后立马就对这两个线程进行启动。

下面我们开始对Acceptor线程进行探索,我们主要研究的是它的run方法。

它的run方法的主要代码如下

while (!stopCalled) {
    //通过ServerSocketChannel.accept获取到连接上来的Socket
    socket = endpoint.serverSocketAccept();
    //对Socket进行设置相关参数
    endpoint.setSocketOptions(socket)
}

我们可以看到主要的任务就是死循环去接收客户端的连接,并且设置一些Socket参数信息。设置参数究竟做了什么?主要逻辑如下

protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            NioChannel channel = null;

            //如果有现有的Channel可以处理,那么先从现有的通道中pop一个出来
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }

            //如果没有现有的Channel,那么需要创建一个NioChannel
            if (channel == null) {
                //准备一个读取Socket数据的处理器(Handler)
                SocketBufferHandler bufhandler = new SocketBufferHandler.....

                //如果开启了SSL,那么需要创建一个SecureNioChannel,如果没开启SSL,准备一个普通的Channel即可
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(bufhandler);
                }
            }

            //将NioChannel和NioEndpoint封装到NioSocketWrapper中
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);  
            socketWrapper = newWrapper;

            socket.configureBlocking(false);  
            poller.register(socketWrapper);
        } catch (Throwable t) {
            /////..............////////
        }
    }

其实主要就是将SocketChannel、NioChannel、SocketBufferHandler等信息包装到NioSocketWrapper对象中,然后将其注册到poller当中。

那么注册到poller的这个方法中究竟做了什么事呢?

        public void register(final NioSocketWrapper socketWrapper) {
            //设置SocketWrapper感兴趣的事件为READ事件
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent event = null;
            if (eventCache != null) {
                event = eventCache.pop();
            }
            if (event == null) {
                event = new PollerEvent(socketWrapper, OP_REGISTER);
            } else {
                event.reset(socketWrapper, OP_REGISTER);
            }
            addEvent(event);
        }

我们可以看到主要就是设置Socket的主要感兴趣的事件为读事件,以及创建一个PollerEvent对象,并执行addEvent方法把该PollerEvent加入到一个同步队列当中去。

接下来我们要研究的是Poller线程的run方法

        public void run() {
            // Loop until destroy() is called
            while (true) {
                boolean hasEvents = false;
                try {
                    if (!close) {
                        //遍历Acceptor给Poller放的PollEvent同步队列,进行消费,把SocketChannel注册到Selector上
                        hasEvents = events();  
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();  //非阻塞,如果keyCount为0也会直接跳过
                        } else {
                            keyCount = selector.select(selectorTimeout);  //阻塞一定时间
                        }
                        wakeupCounter.set(0);
                    }
                    //....................
                } catch (Throwable x) {
                    //............
                }
                //获取SelectionKey的迭代器
                Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;  
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();  //通过迭代器拿到获取SelectionKey
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();  //Wrapper中封装了SelectorPool,Poller,NioChannels,EndPoint等
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {  //获取NioSocketWrapper对象
                        processKey(sk, socketWrapper);  //对事件进行处理
                    }
                }
            }
            getStopLatch().countDown();
        }

我们发现它主要做的工作就是,消费Acceptor接受的客户端请求的PollerEvent同步队列,并把该SocketChannel注册到Selector上。然后就是调用一下select方法拿到Selector上的活跃事件,然后遍历所有的SelectionKey(活跃事件)并进行处理,在processKey方法中主要就是对key进行处理。

我们也来感受一下processKey做了什么?

    //...............................
    if (sk.isReadable() || sk.isWritable()) {  //判断SelectionKey是否可读?是否可写?
        if (socketWrapper.getSendfileData() != null) {
            processSendfile(sk, socketWrapper, false);
        } else {
            unreg(sk, socketWrapper, sk.readyOps());  //避免重复读
            boolean closeSocket = false;
            // Read goes before write
            if (sk.isReadable()) {  //如果SelectionKey是可读的
                if (socketWrapper.readOperation != null) {  //如果读操作不为null
                    if (!socketWrapper.readOperation.process()) {
                        closeSocket = true;
                    } //处理Socket,processSocket能够直接交给线程池去进行处理
                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                    closeSocket = true;
                }
            }
            if (!closeSocket && sk.isWritable()) {
                if (socketWrapper.writeOperation != null) {
                    if (!socketWrapper.writeOperation.process()) {
                        closeSocket = true;
                    }
                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                    closeSocket = true;
                }
            }
            if (closeSocket) {
                cancelledKey(sk, socketWrapper);
            }
        }
    }
    //................................

我们可以发现就是回到了我们传统的java NIO的套路,判断SelectionKey是否可读/可写,然后去执行processSocket拿到对应的处理器,并交给线程池对事件进行处理。

我们可以看到,这是Tomcat自己设计的IO模型,其实和Netty的设计很类似。

上一篇 下一篇

猜你喜欢

热点阅读