EchoServer启动(2)

2020-06-14  本文已影响0人  追梦小蜗牛
image.png

代码分析

接着上次代码继续梳理,上一次分析到如下这个方法:

    public static void fireChannelOpen(Channel channel) {
        // Notify the parent handler.
        if (channel.getParent() != null) {
            fireChildChannelStateChanged(channel.getParent(), channel);
        }

        channel.getPipeline().sendUpstream(@1
                new UpstreamChannelStateEvent(
                        channel, ChannelState.OPEN, Boolean.TRUE));
    }

@1处的代码其实就是在展现着事件将在UpstreamHandler里面按照顺序开始流动,从head开始一直到tail...

 * <pre>
 *
 *                                            I/O Request
 *                                          via {@link Channel} or
 *                                      {@link ChannelHandlerContext}
 *                                                |
 *  +---------------------------------------------+--------------------+
 *  |                       ChannelPipeline       |                    |
 *  |                                            \|/                   |
 *  |       +----------------------+  +-----------+------------+       |
 *  |  LAST | Upstream Handler  N  |  | Downstream Handler  M  | LAST  |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  |   .             /|\                         |                .   |
 *  |   .              |                         \|/               .   |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  |   .   | Upstream Handler N-1 |  | Downstream Handler M-1 |   .   |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  |   .             /|\                         .                .   |
 *  |   .              .                          .                .   |
 *  |   .      [ Going UPSTREAM ]        [ Going DOWNSTREAM ]      .   |
 *  |   .              .                          .                .   |
 *  |   .              .                         \|/               .   |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  |   .   | Upstream Handler  2  |  | Downstream Handler  2  |   .   |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  |   .             /|\                         |                .   |
 *  |   .              |                         \|/               .   |
 *  |   .   +----------+-----------+  +-----------+------------+   .   |
 *  | FIRST | Upstream Handler  1  |  | Downstream Handler  1  | FIRST |
 *  |       +----------+-----------+  +-----------+------------+       |
 *  |                 /|\                         |                    |
 *  +------------------+--------------------------+--------------------+
 *                     |                         \|/
 *  +------------------+--------------------------+--------------------+
 *  |             I/O Threads (Transport Implementation)               |
 *  +------------------------------------------------------------------+

进入到Pipeline的sendUpstream方法如下:

    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);@2
        } catch (Throwable t) {
            notifyHandlerException(e, t);@3
        }
    }

@2就是从DefaultChannelHandlerContext 上面获取与之关联的handler,然后调用对应的handleUpstream方法,就当前这个例子来说,第一个UpstreamHandler是EchoHandler,第二个是Binder,所以首先调用EchoHandler的handleUpstream方法,由于EchoHandler继承的是SimpleChannelUpstreamHandler,所以也就是调用SimpleChannelUpstreamHandler的handleUpstream方法,进入到handleUpstream方法内,如下:

public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

        if (e instanceof MessageEvent) {
            messageReceived(ctx, (MessageEvent) e);
        } else if (e instanceof WriteCompletionEvent) {
            WriteCompletionEvent evt = (WriteCompletionEvent) e;
            writeComplete(ctx, evt);
        } else if (e instanceof ChildChannelStateEvent) {
            ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
            if (evt.getChildChannel().isOpen()) {
                childChannelOpen(ctx, evt);
            } else {
                childChannelClosed(ctx, evt);
            }
        } else if (e instanceof ChannelStateEvent) {
            ChannelStateEvent evt = (ChannelStateEvent) e;
            switch (evt.getState()) {
            case OPEN:
                if (Boolean.TRUE.equals(evt.getValue())) {@4
                    channelOpen(ctx, evt);
                } else {
                    channelClosed(ctx, evt);
                }
                break;
            case BOUND:
                if (evt.getValue() != null) {
                    channelBound(ctx, evt);
                } else {
                    channelUnbound(ctx, evt);
                }
                break;
            case CONNECTED:
                if (evt.getValue() != null) {
                    channelConnected(ctx, evt);
                } else {
                    channelDisconnected(ctx, evt);
                }
                break;
            case INTEREST_OPS:
                channelInterestChanged(ctx, evt);
                break;
            default:
                ctx.sendDownstream(e);
            }
        } else if (e instanceof ExceptionEvent) {
            exceptionCaught(ctx, (ExceptionEvent) e);
        } else {
            ctx.sendUpstream(e);
        }
    }

走的是@4分支,继续进去:

        public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);@5
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);@6
            }
        }

@5获取当前Pipeline的下一个handler也就是Binder,然后继续执行sendUpstream方法,当前的state仍是open,会调用到Binder的channelOpen方法,代码如下:

@Override
        public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            try {
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

                // Split options into two categories: parent and child.
                Map<String, Object> allOptions = getOptions();
                Map<String, Object> parentOptions = new HashMap<String, Object>();
                for (Entry<String, Object> e: allOptions.entrySet()) {
                    if (e.getKey().startsWith("child.")) {
                        childOptions.put(
                                e.getKey().substring(6),
                                e.getValue());
                    } else if (!e.getKey().equals("pipelineFactory")) {
                        parentOptions.put(e.getKey(), e.getValue());
                    }
                }

                // Apply parent options.
                evt.getChannel().getConfig().setOptions(parentOptions);
            } finally {
                ctx.sendUpstream(evt);@7
            }

            boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));@8
            assert finished;
        }

@7处的代码是继续传递这个事件,@8处的代码就是把open的channel绑定到指定的url,然后把结果放到队列里面。进入到bind方法里面如下,这个方法主要是把bind这个事件发送给ChannelDownstreamHandler处理:

    public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(@9
                channel, future, ChannelState.BOUND, localAddress));
        return future;
    }

进入到@9sendDownstream方法里面如下:

    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);@10
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }

        sendDownstream(tail, e);
    }

当前实例是没有DownstreamHandler的,所以代码会执行@10这里,然后进入到eventSunk和handleServerSocket方法:

    public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);
        }
    }

private void handleServerSocket(ChannelEvent e) {
        if (!(e instanceof ChannelStateEvent)) {
            return;
        }

        ChannelStateEvent event = (ChannelStateEvent) e;
        NioServerSocketChannel channel =
            (NioServerSocketChannel) event.getChannel();
        ChannelFuture future = event.getFuture();
        ChannelState state = event.getState();
        Object value = event.getValue();

        switch (state) {
        case OPEN:
            if (Boolean.FALSE.equals(value)) {
                close(channel, future);
            }
            break;
        case BOUND:
            if (value != null) {
                bind(channel, future, (SocketAddress) value);@11
            } else {
                close(channel, future);
            }
            break;
        }
    }

分支会走到@11分支,当前代码还在NioServerSocketPipelineSink类里面,然后进入到@11的方法如下:

private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {

        boolean bound = false;
        boolean bossStarted = false;
        try {
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound = true;

            future.setSuccess();@12
            fireChannelBound(channel, channel.getLocalAddress());@13

            Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
            bossExecutor.execute(
                    new IoWorkerRunnable(
                            new ThreadRenamingRunnable(
                                    new Boss(channel),@14
                                    "New I/O server boss #" + id +
                                    " (channelId: " + channel.getId() +
                                    ", " + channel.getLocalAddress() + ')')));
            bossStarted = true;
        } catch (Throwable t) {
            future.setFailure(t);
            fireExceptionCaught(channel, t);
        } finally {
            if (!bossStarted && bound) {
                close(channel, future);
            }
        }
    }

@12处代码:Marks this future as a success and notifies all listeners
@14处代码最终会运行Boss线程里面的run方法,内容如下:

public void run() {
            final Thread currentThread = Thread.currentThread();

            for (;;) {
                try {
                    if (selector.select(1000) > 0) {
                        selector.selectedKeys().clear();
                    }

                    SocketChannel acceptedSocket = channel.socket.accept();@15
                    if (acceptedSocket != null) {
                        registerAcceptedChannel(acceptedSocket, currentThread);@16
                    }
                } catch (SocketTimeoutException e) {
                    // Thrown every second to get ClosedChannelException
                    // raised.
                } catch (CancelledKeyException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedSelectorException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedChannelException e) {
                    // Closed as requested.
                    break;
                } catch (IOException e) {
                    logger.warn(
                            "Failed to accept a connection.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        // Ignore
                    }
                }
            }

            closeSelector();
        }

@15用来接收客户端来的请求,转换成代码就是一个SocketChannel 。
进入@16registerAcceptedChannel的代码,内容如下:

private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
            try {
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
                NioWorker worker = nextWorker();@17
                worker.register(new NioAcceptedSocketChannel(@18
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
            } catch (Exception e) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
                try {
                    acceptedSocket.close();
                } catch (IOException e2) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
        }

@17处的代码是根据一定的规则从NioWorker数组里面获取一个线程,来处理当前请求。
@18进入register的代码里面:

void register(NioSocketChannel channel, ChannelFuture future) {

        boolean server = !(channel instanceof NioClientSocketChannel);
        Runnable registerTask = new RegisterTask(channel, future, server);
        Selector selector;

        synchronized (startStopLock) {
            if (!started) {
                // Open a selector if this worker didn't start yet.
                try {
                    this.selector = selector = Selector.open();@19
                } catch (Throwable t) {
                    throw new ChannelException(
                            "Failed to create a selector.", t);
                }

                // Start the worker thread with the new Selector.
                String threadName =
                    (server ? "New I/O server worker #"
                            : "New I/O client worker #") + bossId + '-' + id;

                boolean success = false;
                try {
                    executor.execute(
                            new IoWorkerRunnable(
                                    new ThreadRenamingRunnable(this, threadName)));@20
                    success = true;
                } finally {
                    if (!success) {
                        // Release the Selector if the execution fails.
                        try {
                            selector.close();
                        } catch (Throwable t) {
                            logger.warn("Failed to close a selector.", t);
                        }
                        this.selector = selector = null;
                        // The method will return to the caller at this point.
                    }
                }
            } else {
                // Use the existing selector if this worker has been started.
                selector = this.selector;
            }

            assert selector != null && selector.isOpen();

            started = true;
            boolean offered = registerTaskQueue.offer(registerTask);
            assert offered;
        }

        if (wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }

@19说明:Open a selector if this worker didn't start yet
@20:会执行this线程的run方法,也就是NioWorker的run方法,内容如下:

public void run() {
        thread = Thread.currentThread();

        boolean shutdown = false;
        Selector selector = this.selector;
        for (;;) {
            wakenUp.set(false);

            if (CONSTRAINT_LEVEL != 0) {
                selectorGuard.writeLock().lock();
                    // This empty synchronization block prevents the selector
                    // from acquiring its lock.
                selectorGuard.writeLock().unlock();
            }

            try {
                int selectedKeyCount = selector.select(500);

                // 'wakenUp.compareAndSet(false, true)' is always evaluated
                // before calling 'selector.wakeup()' to reduce the wake-up
                // overhead. (Selector.wakeup() is an expensive operation.)
                //
                // However, there is a race condition in this approach.
                // The race condition is triggered when 'wakenUp' is set to
                // true too early.
                //
                // 'wakenUp' is set to true too early if:
                // 1) Selector is waken up between 'wakenUp.set(false)' and
                //    'selector.select(...)'. (BAD)
                // 2) Selector is waken up between 'selector.select(...)' and
                //    'if (wakenUp.get()) { ... }'. (OK)
                //
                // In the first case, 'wakenUp' is set to true and the
                // following 'selector.select(...)' will wake up immediately.
                // Until 'wakenUp' is set to false again in the next round,
                // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                // any attempt to wake up the Selector will fail, too, causing
                // the following 'selector.select(...)' call to block
                // unnecessarily.
                //
                // To fix this problem, we wake up the selector again if wakenUp
                // is true immediately after selector.select(...).
                // It is inefficient in that it wakes up the selector for both
                // the first case (BAD - wake-up required) and the second case
                // (OK - no wake-up required).

                if (wakenUp.get()) {
                    selector.wakeup();
                }

                processRegisterTaskQueue();@21
                processWriteTaskQueue();@22

                if (selectedKeyCount > 0) {
                    processSelectedKeys(selector.selectedKeys());@23
                }

                // Exit the loop when there's nothing to handle.
                // The shutdown flag is used to delay the shutdown of this
                // loop to avoid excessive Selector creation when
                // connections are registered in a one-by-one manner instead of
                // concurrent manner.
                if (selector.keys().isEmpty()) {
                    if (shutdown ||
                        executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {

                        synchronized (startStopLock) {
                            if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                                started = false;
                                try {
                                    selector.close();
                                } catch (IOException e) {
                                    logger.warn(
                                            "Failed to close a selector.", e);
                                } finally {
                                    this.selector = null;
                                }
                                break;
                            } else {
                                shutdown = false;
                            }
                        }
                    } else {
                        // Give one more second.
                        shutdown = true;
                    }
                } else {
                    shutdown = false;
                }
            } catch (Throwable t) {
                logger.warn(
                        "Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

@21和@22处理Queue里面的任务,@23处理具体的读或者写请求,也就是processSelectedKeys这个方法的内容如下:

private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_READ) != 0) {
                    if (!read(k)) {
                        // Connection already closed - no need to handle write.
                        continue;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    write(k);
                }
            } catch (CancelledKeyException e) {
                close(k);
            }
        }
    }

processSelectedKeys这个方法里面主要是遍历SelectionKey,然后处理具体的read或者write请求,东西还不少,放在下一篇内容里面继续分析......

总结:
感觉看是一个简单的东西,其实没那么简单,底层做了太多太多的东西,设计者的思想都在底层,任何看起来简单的东西都不会太简单的,慢慢体会吧。

上一篇下一篇

猜你喜欢

热点阅读