NIO的SelectionKey(选择键)

2018-01-01  本文已影响526人  秦汉邮侠

要点

操作集

操作属性

0表示什么?

方法列表

参考代码

public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);
        //向selector注册该channel
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("-->Start serverSocket.register!");

        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        System.out.println("-->attach(new Acceptor()!");
    }


    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    //来一个事件 第一次触发一个accepter线程
                    //以后触发SocketReadHandler
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            System.out.println("reactor stop!" + ex);
        }
    }

    //运行Acceptor或SocketReadHandler
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                System.out.println("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    //调用Handler来处理channel
                    new Handler(selector, c);
            } catch (IOException ex) {
            }
        }
    }
}
public class Handler implements Runnable {

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        //设置为非阻塞模式
        c.configureBlocking(false);
        //此处的0,表示不关注任何时间
        sk = socket.register(sel, 0);
        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
        sk.attach(this);
        //将SelectionKey标记为可读,以便读取,不可关注可写事件
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        return false;
    }

    boolean outputIsComplete() {
        return false;
    }

    //这里可以通过线程池处理数据
    void process() {

    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }

    }


    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            //
            sk.cancel();
        }
    }
    
}

参考来源

上一篇 下一篇

猜你喜欢

热点阅读