程序猿之路

NIO源码分析(二)

2017-05-03  本文已影响94人  三斤牛肉

紧跟前文,先把前面缺的例子补全,接着一步一步分析。
这里的内容比较少,直接以注释的形式补充在每一行上,有必要的地方会在底下补上图文。

public void dispatch(SelectionKey key) throws IOException, InterruptedException {
        if (key.isAcceptable()) {
//前文中已经轮训过一遍updateList,通过
//ski.channel.translateAndSetReadyOps已经将准备好的操作set到key中,
//这里的key有4种操作OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT,
//这里只有ServerSocketChannel支持accept操作,对应的操作如最底下
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//accept操作必定是ServerSocketChannel
            // 接受一个连接. 底下详细讲
            SocketChannel sc = ssc.accept();
            
            // 对新的连接的channel注册read事件. 
            sc.configureBlocking(false);    
                            //这里的selector是另一个了
            sc.register(readBell.getSelector(), SelectionKey.OP_READ);
            
            // 如果读取线程还没有启动,那就启动一个读取线程.
            synchronized(NioServer.this) {
                if (!NioServer.this.isReadBellRunning) {
                    NioServer.this.isReadBellRunning = true;
                    new Thread(readBell).start();
                }
            }
            
        } else if (key.isReadable()) {
            // 这是一个read事件,并且这个事件是注册在socketchannel上的.
            SocketChannel sc = (SocketChannel) key.channel(); //这个channel就是 ssc.accept()对应的那个
            // 写数据到buffer
            int count = sc.read(temp);
            if (count < 0) {
                // 客户端已经断开连接.
                key.cancel();
                sc.close();
                return;
            }
            // 切换buffer到读状态,内部指针归位.
            temp.flip();
            String msg = Charset.forName("UTF-8").decode(temp).toString();
            System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
            
            Thread.sleep(1000);
            // echo back.
            sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
            
            // 清空buffer
            temp.clear();
        }
    }
    
}

核心语句 ssc.accept()

public SocketChannel accept() throws IOException {
    synchronized (lock) {//加锁
        if (!isOpen())
            throw new ClosedChannelException();
        if (!isBound())
            throw new NotYetBoundException();
        SocketChannel sc = null;

        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        try {
            begin();//和下面的end()配对,用于线程被中断时关闭channel,这里另开一篇再讲
            if (!isOpen())
                return null;
            thread = NativeThread.current();
            for (;;) {
                n = accept0(this.fd, newfd, isaa);//native函数,解释:
    // Accepts a new connection, setting the given file descriptor to refer to
    // the new socket and setting isaa[0] to the socket's remote address.
    // Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
    // connections are pending) or IOStatus.INTERRUPTED.

                if ((n == IOStatus.INTERRUPTED) && isOpen())//如果状态是INTERRUPTED且channel还是open的则继续尝试
                    continue;
                break;
            }
        } finally {
            thread = 0;
            end(n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;

        IOUtil.configureBlocking(newfd, true);
        InetSocketAddress isa = isaa[0];
        //新建一个socketchannel供read/write操作使用
        sc = new SocketChannelImpl(provider(), newfd, isa);
        //IP端口权限检测
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(),
                               isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    }
}

就绪操作与通道对应关系:

OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
客户端 SocketChannel Y Y Y
服务端 ServerSocketChannel Y
服务端 SocketChannel Y Y
上一篇下一篇

猜你喜欢

热点阅读