Tomcat源码分析2 之 Protocol实现分析

2017-05-29  本文已影响0人  江之北

简介

本文继续以tomcat8 为例,简单分析下Tomcat的几种protocol的差异,以及处理链接的细节。
  主要有以下内容:</br>
  - tomcat protocol的分类</br>
  - tomcat protocol的实现</br>
  - 各个protocol的差异</br>

Tomcat protocol 配置

参考官方文档,tomcat protocol配置,可以看到protocol主要是有四种,默认使用的HTTP/1.1 ,对于tomcat8 以更高版本来说,HTTP/1.1的配置会默认使用nio来处理,也就是org.apache.coyote.http11.Http11NioProtocol

其他的几种:</br>
  org.apache.coyote.http11.Http11Protocol java的bio connector,使用ServerSocket处理请求。</br>
  org.apache.coyote.http11.Http11NioProtocol java的nio connector,使用SocketChannel处理请求.</br>
  org.apache.coyote.http11.Http11Nio2Protocol java7新出的aio connector,使用AsynchronousSocketChannel处理请求。</br>
  org.apache.coyote.http11.Http11Nio2Protocol tomcat的native library connector。(这个我们稍后再讲)</br>
  
  对于tomcat8 以更高版本来说,HTTP/1.1的配置会默认使用nio来处理,也就是
org.apache.coyote.http11.Http11NioProtocol。由org.apache.catalina.connector.ConnectorsetProtocol()方法也可以看到:

    public void setProtocol(String protocol) {
        //如果配置了apr,会默认使用apr(apr后面再讲)
        if (AprLifecycleListener.isAprAvailable()) {
            if ("HTTP/1.1".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11AprProtocol");
            } else if ("AJP/1.3".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.ajp.AjpAprProtocol");
            } else if (protocol != null) {
                setProtocolHandlerClassName(protocol);
            } else {
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11AprProtocol");
            }
        } else {
           //这里可以看到,HTTP/1.1是server.xml的默认配置,会默认使用nio处理
            if ("HTTP/1.1".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.http11.Http11NioProtocol");
            } else if ("AJP/1.3".equals(protocol)) {
                setProtocolHandlerClassName
                    ("org.apache.coyote.ajp.AjpNioProtocol");
            } else if (protocol != null) {
            //其他情况下,使用指定的protocol
                setProtocolHandlerClassName(protocol);
            }
        }

    }

Tomcat protocol的处理流程

ConnectorstartInternal()方法会启动protocol

    @Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        ...
        try {
            protocolHandler.start();
        } catch (Exception e) {
            ...
        }
    }

这里的protocolHandler,就是上面setProtocol()方法指定的protocol。暂时以Http11NioProtocol为例,分析下请求处理流程。上面startInternal()方法接下来会到org.apache.coyote.AbstractProtocolstart()方法。

    @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled())
            getLog().info(sm.getString("abstractProtocolHandler.start",
                    getName()));
        try {
            //每个protocol都有一个对应的enpoint,最终是由endpoint来负责处理链接的。
            endpoint.start();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.startError",
                    getName()), ex);
            throw ex;
        }
    }

每个protocol对应了一个Endpoint</br>
  Http11Protocol对应org.apache.tomcat.util.net.JIoEndpoint</br>
  Http11NioProtocol对应org.apache.tomcat.util.net.NioEndpoint</br>
  Http11Nio2Protocol对应org.apache.tomcat.util.net.Nio2Endpoint</br>
  继续查看Nio2EndpointstartInternal()方法.

    @Override
    public void startInternal() throws Exception {

        if (!running) {
            ...
            //初始化最大连接数限制,在server.xml中可配置
            initializeConnectionLatch();

            // Start poller threads
            // poller 主要负责检查各个 Selector 的状态以及处理超时等
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            //启动接受链接的线程
            startAcceptorThreads();
        }
    }

startAcceptorThreads()方法会启动Endpoint内部的Acceptor。继续查看org.apache.tomcat.util.net.NioEndpoint.Acceptorrun()方法:

        @Override
        public void run() {
            ...
            // Loop until we receive a shutdown command
            while (running) {
                ...
                try {
                    //检查下当前是否已经达到了最大链接数
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                       ...
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller if successful
                    // setSocketOptions 会把channel添加到poller
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {
                            countDownConnection();
                            closeSocket(socket);
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } ...
            }
            state = AcceptorState.ENDED;
        }
    }

从代码中可以看到,使用 SocketChannel 接受连接,然后通过 setSocketOptions 方法把 channel 交给 poller 处理。setSocketOptions(SocketChannel socket) 方法则是真正处理链接的地方。

    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            NioChannel channel = nioChannels.pop();
            ...
            //会把 调用poller的 register 方法,把 channel 交给 poller
            getPoller0().register(channel);
        } catch (Throwable t) {
           ...
        }
        return true;
    }

getPoller0() 会根据设置的poll线程数,返回一个当前可用的 Poller 对象,使用Round robin算法实现一个简单的负载均衡。

    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }

接下来查看 register()方法

        public void register(final NioChannel socket) {
            socket.setPoller(this);
            // 新建一个 KeyAttachment 对象,保存跟socket相关的一些信息
            KeyAttachment ka = new KeyAttachment(socket);
            ka.setPoller(this);
            ka.setTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            PollerEvent r = eventCache.pop();

            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            //比较重要的是这句,把 r 添加到当前的 events 队列中。
            addEvent(r);
        }

上面可以看到,每个请求进来之后,会通过 register 方法创建一个 PollerEvent 对象并添加到当前的 SynchronizedQueue<PollerEvent> events 队列中。
接下来看 Poller的run()方法,上面已经说过,Poller 在startInternal()方法调用的时候创建并启动。

        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                try {
                    ...
                    boolean hasEvents = false;
                    // Time to terminate?
                    if (close) {
                        events();
                        timeout(0, false);
                        ...
                        break;
                    } else {
                        //调用 events() 方法
                        hasEvents = events();
                    }
                }
              ...
            }//while
            stopLatch.countDown();
        }

接下来查看events()方法

        public boolean events() {
            boolean result = false;
            PollerEvent pe = null;
            while ( (pe = events.poll()) != null ) {
                result = true;
                try {
                    //调用 PollerEvent 的 run() 方法
                    pe.run();
                    pe.reset();
                    if (running && !paused) {
                        eventCache.push(pe);
                    }
                } catch ( Throwable x ) {
                    log.error("",x);
                }
            }
            return result;
        }

可以看到,在tomcat启动之后,会启动相应的endpoint的Acceptor来接受请求,同时启动相应的Poller来处理请求。</br>
  PollerEvent的run方法

        @Override
        public void run() {
            //对于新增的链接,会注册给 poller 的 selector 处理。
            if ( interestOps == OP_REGISTER ) {
                try {
                //把当前的key注册给poller中的selector对象,准备后续处理。
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x);
                }
            } else {
                ...
            }//end if
        }//run

再回过来看 Poller 的 run() 方法。里面有一段

                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            attachment.access();
                            iterator.remove();
                            processKey(sk, attachment);
                        }
                    }//while

会检查selector所有的key,然后处理调用processKey()进行处理。
processKey() 会根据配置的Executer调用SocketProcessorrun() 方法.

接下来看SocketProcessorrun()方法最终会调用doRun()方法,在doRun()里有如下代码:

        if (status == null) {
            state = handler.process(ka, SocketStatus.OPEN_READ);
       } else {
          state = handler.process(ka, status);
       }

接下来会调用到handlerprocess方法,这个handler就是在Http11NioProtocol的构造方法里由setHandler设置的handler,也就是Http11ConnectionHandler 继承了 AbstractConnectionHandler,接着来到 org.apache.coyote.AbstractProtocol.AbstractConnectionHandlerprocess方法,这里会对每个socket做处理。包括调用具体的servlet处理业务等等。

总结

tomcat在启动的时候,根据配置的protocol,启动不同的Endpoint中的Acceptor 和 Poller。Acceptor负责接受请求,Poller负责调用线程池执行业务。

上一篇下一篇

猜你喜欢

热点阅读