JAVANIO -选择器02 Selector原理和使用

2019-07-23  本文已影响0人  贪睡的企鹅

Selector 概述

Selector 一般称为选择器,用来作为SelectableChannel通道的多路复用器。SelectableChannel类型通道可以被注册到多路复用器,通过多路复用器监听感兴趣的事件,这样就可以通过Selector实现单个线程可以管理多个SelectableChannel通道,从而管理多个网络连接。

image

监听事件

当SelectableChannel通道注册到Selector多路复用器时需要指定感兴趣的事件。

//socketChannel注册到选择器中监听读取到达事件
socketChannel.register(selector, SelectionKey.OP_READ);

事件类型定义在SelectionKey类的静态常量中,使用二进制位运算得到的整数。

public abstract class SelectionKey {

...省略代码
//一个通道已准备好读取
public static final int OP_READ = 1 << 0;

//一个通道已准备好写入
public static final int OP_WRITE = 1 << 2;

//与远程服务器建立连接。 
public static final int OP_CONNECT = 1 << 3;

// ServerSocketChannel接受连接。 
public static final int OP_ACCEPT = 1 << 4;

如果SelectableChannel通道对多个选择键感兴趣,可以使用‘|’位运算后在注册到选择器

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 
socketChannel.register(selector, interestSet);

不同SelectableChannel子类(通道)支持的SelectionKey(选择键)不同。
可以通过validOps函数获取其支持的监听事件

    //ServerSocketChannel 支持的选择键
    public final int validOps() {
        return SelectionKey.OP_ACCEPT;
    }
    
    //socketChannel 支持的选择键
    public final int validOps() {
        return (SelectionKey.OP_READ
                | SelectionKey.OP_WRITE
                | SelectionKey.OP_CONNECT);
    }

SelectionKey

一个Selector中可以注册多通道,不同通道在选择器中被封装成为SelectionKey对象。

public class SelectionKeyImpl extends AbstractSelectionKey {
    /** 通道 **/
    final SelChImpl channel;
    /** 多路复用器**/
    public final SelectorImpl selector;
    private int index;
    /** 感兴趣的事件 **/
    private volatile int interestOps;
    /** 就绪的事件**/
    private int readyOps;

从SelectionKeyImpl定义可以看出相同通道注册多次感兴趣选择键对应到Selector中SelectionKey对象是同一个。

SocketChannel ch1 = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
serverSocketChannel.close();
SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);

SocketChannel ch2 = serverSocketChannel2.accept();
socketChannel.configureBlocking(false);
SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
image

事件管理机制

Seclect内部存在三个集合来管理SelectionKey。监听事件通道集合(publicKeys),通道事件就绪集合(publicSelectedKeys),取消监听通道(cancelKeys)

image

案例

    /**
     * 通过register,可以将Channel通道封装成SelectionKey对象添加到keysSet集合中
     */
    @Test
    public void test_keysSet_add() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);

        /** 获取publicKeys集合 **/
        Set<SelectionKey> selectionKeys_ = selector.keys();
        /** 获取注册到选择器中Channel通道的数量**/
        System.out.println("selector.keys.size()" + selector.keys());

        /** 遍历SelectionKey**/
        Iterator<SelectionKey> iterator_ = selectionKeys_.iterator();
        while (iterator_.hasNext()) {
            SelectionKey key = iterator_.next();
            System.out.println(key);
        }
    }


register1sun.nio.ch.SelectionKeyImpl@7f560810
register2sun.nio.ch.SelectionKeyImpl@69d9c55
selector.keys.size()[sun.nio.ch.SelectionKeyImpl@7f560810, sun.nio.ch.SelectionKeyImpl@69d9c55]
sun.nio.ch.SelectionKeyImpl@7f560810
sun.nio.ch.SelectionKeyImpl@69d9c55    
    @Test
    public void test_keysSet_add2_server() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);


        boolean is_Run=true;
        while (is_Run) {

            /** 阻塞等待事件到达**/
            selector.select();

            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    serverSocketChannel.close();
                    SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
                    SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);
                    // 同一个通道注册多次事件,返回的都是同一个 SelectionKey
                    System.out.println(register3==register4);
                }
            }
        }
    }


    @Test
    public void test_keysSet_add2_client() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        SocketChannel socketChannel2 = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel2.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
            socketChannel2.connect(new InetSocketAddress("localhost", 7777));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

   
true
true    

无法手动对publicKeys集合做修改,如修改会抛出UnsupportedOperationException异常

    @Test
    public void test_keysSet_del1() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);

        /** 获取publicKeys集合 **/
        Set<SelectionKey> selectionKeys_ = selector.keys();
        //手动删除会抛出异常
        selectionKeys_.remove(register2);
    }
    @Test
    public void test_keysSet_del3_server() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);

        // 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行后将关闭通道SelectionKey从集合中删除
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(15);
                    Set<SelectionKey> selectionKeys = selector.keys();
                    System.out.println(selectionKeys);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        boolean is_Run=true;
        while (is_Run) {
            // 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行前publicKeys集合
            Set<SelectionKey> selectionKeys2 = selector.keys();
            System.out.println(selectionKeys2);
            /** 阻塞等待事件到达**/
            selector.select();

            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //如果注册在选择器中通道对应SelectionKey被关闭,选择器会在publicKeys集合中标记这个SelectionKey,并在下次selector.select()方法后删除
                    key.cancel();
                    System.out.println("selector.keys pre_del()");
                }
            }
        }
    }

    @Test
    public void test_keysSet_del3_client() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
selector.keys pre_del()
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
[sun.nio.ch.SelectionKeyImpl@13a57a3b]    
 @Test
    public void test_keysSet_del2_server() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);

        /** 用来查看通道被关闭后并调用selector.select()方法后keys集合会将关闭通道SelectionKey从集合中删除 **/
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(15);
                    Set<SelectionKey> selectionKeys = selector.keys();
                    System.out.println(selectionKeys);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        boolean is_Run=true;
        while (is_Run) {
            // 在通道关闭后,select()方法执行前答应下当前keys集合
            Set<SelectionKey> selectionKeys2 = selector.keys();
            System.out.println(selectionKeys2);
            /** 阻塞等待事件到达**/
            selector.select();

            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //选择器中通道被关闭后,选择器会在keysSet集合中标记,并在下次selector.select()方法后删除
                    serverSocketChannel.close();
                    System.out.println("selector.keys pre_del()");
                }
            }
        }
    }

    @Test
    public void test_keysSet_del2_client() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
selector.keys pre_del()
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
[sun.nio.ch.SelectionKeyImpl@13a57a3b]    
 @Test
    public void test_publicSelectedKeys2_server() throws Exception {
        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);


        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);

        Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
        boolean is_Run=true;
        while (is_Run) {

            /** 阻塞等待事件到达**/
            selector.select();
            /** 获取到达事件SelectionKey集合**/
            selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                //在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();

                    if(socketChannel==null){
                        System.out.println("重复连接");
                    }
                    InetSocketAddress localAddress =   (InetSocketAddress) serverSocketChannel.getLocalAddress();
                    System.out.println(localAddress.getPort()+"被连接了");
                    System.out.println("isAcceptable");
                }
            }
        }
    }

    @Test
    public void test_publicSelectedKeys2_client1() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test_publicSelectedKeys2_client2() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
1 注释掉iterator.remove();,顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client2  

8888被连接了
isAcceptable
重复连接
8888被连接了
isAcceptable
7777被连接了
isAcceptable

2 打开注释iterator.remove();顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client

8888被连接了
isAcceptable
7777被连接了
isAcceptable


不同的事件处理

OP_READ

@Test
    public void test_publicSelectedKeys_server2() throws Exception {
        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
        Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
        boolean is_Run=true;
        while (is_Run) {

            /** 阻塞等待事件到达**/
            System.out.println("selector.selectedKeys:"+selectionKeys);
            selector.select();
            System.out.println("selector.selectedKeys:"+selectionKeys);
            /** 获取到达事件SelectionKey集合**/
            selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                    System.out.println("isAcceptable");
                }else if(key.isReadable()){
                    SocketChannel socketChannel = (SocketChannel)key.channel();
                    //注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
                    //ByteBuffer allocate = ByteBuffer.allocate(50);
                    //socketChannel.read(allocate);
                    //System.out.println(new String(allocate.array()));
                }
            }
        }
    }

    @Test
    public void test_publicSelectedKeys_client2() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
            socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

OP_WRITE

如果通道注册了OP_READ事件,选择器会自动将通道对应SelectionKey添加到publicSelectedKeys集合,,用户线程遍历需SelectedKeys集合获取通道,调用write方法发送数据。发送数据完毕,就可以将SelectionKey从publicSelectedKeys集合中删除。不同下次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中,也就是无法停止,这时我们只能通过设置SelectionKey.interestOps()重新设置事件,将OP_WRITE事件去掉,那么下次下次调用select方法时就不会重新添加到publicSelectedKeys集合中。

    @Test
    public void test_publicSelectedKeys_client3() throws Exception {
        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost", 7777));
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        boolean is_Run = true;
        while (is_Run) {
            /** 阻塞等待事件到达**/
            selector.select();

            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isConnectable()) {
                    socketChannel = (SocketChannel) key.channel();
                    while (!socketChannel.finishConnect()){
                    }
                    /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_READ(当客户端请求数据时事件到达被添加到selectedKeys集合中) **/
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                else if (key.isReadable()) {
                    socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(100);
                    int read = socketChannel.read(byteBuffer);
                    System.out.println(new String(byteBuffer.array()));
                }
            }
        }
    }

OP_ACCEPT

    @Test
    public void test_publicSelectedKeys_server1() throws Exception {
        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
        Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
        boolean is_Run=true;
        while (is_Run) {

            /** 阻塞等待事件到达**/
            System.out.println("selector.selectedKeys:"+selectionKeys);
            selector.select();
            System.out.println("selector.selectedKeys:"+selectionKeys);
            /** 获取到达事件SelectionKey集合**/
            selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    //注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
                    //serverSocketChannel.accept();
                    System.out.println("isAcceptable");
                }
            }
        }
    }

    @Test
    public void test_publicSelectedKeys_client1() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 8888));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

select

select()方法提供给用户线程判断是否存在就绪通道,如果不存在就绪通道则会阻塞当前线程,直到某个通道的事件到达。

处理流程

image

需要注意如下特性

案例
 /**
     * interrupt()中断函数可以用来释放所有selector.select()阻塞
     */
    @Test
    public void test_select() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();
        Thread currentThread = Thread.currentThread();

        /** 开启一个线程5s后调用close()方法关闭选择器**/
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.SECONDS.sleep(5);
                    currentThread.interrupt();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));

        /** 设置为非阻塞IO模型 **/
        serverSocketChannel.configureBlocking(false);

        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        /** 阻塞等待事件到达**/
        //可以被中断
        selector.select();

        /** 获取到达事件SelectionKey集合**/
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();

        /** 遍历SelectionKey**/
        while (iterator.hasNext()){
            SelectionKey key = iterator.next();
            /** 判断是否是OP_ACCEPT事件**/
            if(key.isAcceptable()){
                /** 从SelectionKey获取对应通道ServerSocketChannel**/
                ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                /** 获取SocketChannel**/
                SocketChannel accept = socketChannel.accept();
                accept.close();
            }
        }
        serverSocketChannel.close();
        System.out.println("over");
    }
//over    
 @Test
    public void test_select3() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();
        Thread currentThread = Thread.currentThread();

        /** 开启一个线程5s后调用close()方法关闭选择器**/
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.SECONDS.sleep(5);
                    selector.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));

        /** 设置为非阻塞IO模型 **/
        serverSocketChannel.configureBlocking(false);

        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        /** 阻塞等待事件到达**/
        //可以被中断
        selector.select();

        serverSocketChannel.close();
        System.out.println("over");
    }
@Test
    public void test_select1() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();
        Thread currentThread = Thread.currentThread();

        /** 开启一个线程5s后调用wakeup()方法**/
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.SECONDS.sleep(5);
                    selector.wakeup();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));

        /** 设置为非阻塞IO模型 **/
        serverSocketChannel.configureBlocking(false);

        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        /** 阻塞等待事件到达**/
        //可以被中断
        selector.select();


        /** 获取到达事件SelectionKey集合**/
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();

        /** 遍历SelectionKey**/
        while (iterator.hasNext()){
            SelectionKey key = iterator.next();
            /** 判断是否是OP_ACCEPT事件**/
            if(key.isAcceptable()){
                /** 从SelectionKey获取对应通道ServerSocketChannel**/
                ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                /** 获取SocketChannel**/
                SocketChannel accept = socketChannel.accept();
                accept.close();
            }
        }
        serverSocketChannel.close();
        System.out.println("over");
    }
 @Test
    public void test_select4_server() throws Exception {
        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel1.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);


        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel2.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);


        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel3 = ServerSocketChannel.open();
        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel3.bind(new InetSocketAddress("localhost", 6666));
        /** 设置为非阻塞IO模型 **/
        serverSocketChannel3.configureBlocking(false);
        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        SelectionKey register3 = serverSocketChannel3.register(selector, SelectionKey.OP_ACCEPT);

        Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
        boolean is_Run=true;
        while (is_Run) {

            /** 阻塞等待事件到达**/
            int updatekey = selector.select();
            /** 获取到达事件SelectionKey集合**/
            selectionKeys = selector.selectedKeys();
            System.out.println("updatekey:"+updatekey);

            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                //在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
                //iterator.remove();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                }
            }
        }
    }

    @Test
    public void test_select4_client1() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 6666));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test_select4_client2() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        SocketChannel socketChannel1 = SocketChannel.open();
        try {
            socketChannel.configureBlocking(true);
            socketChannel1.configureBlocking(true);
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
            socketChannel1.connect(new InetSocketAddress("localhost", 8888));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }   

close

 /**
     * 调用close()关闭选择器会导致
     *
     * 1 释放所有selector.select()阻塞
     *
     * 2 除了再次调用close()和wakeup()方法外,调用selector()的其他方法均出现异常。
     *
     * 3 如果选择器已经关闭则在次调用此方法将不起作用
     */
    @Test
    public void test_close() throws Exception {

        /** 实例化一个选择器对象 **/
        Selector selector = Selector.open();

        /** 开启一个线程5s后调用close()方法关闭选择器**/
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.SECONDS.sleep(5);
                    selector.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();

        /** 创建服务器套接字通道 ServerSocketChannel **/
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        /** 绑定监听 InetSocketAddress **/
        serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));

        /** 设置为非阻塞IO模型 **/
        serverSocketChannel.configureBlocking(false);

        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        /** 阻塞等待事件到达**/
        selector.select();

        /** 获取到达事件SelectionKey集合**/
        //此出抛出异常
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();

        /** 遍历SelectionKey**/
        while (iterator.hasNext()){
            SelectionKey key = iterator.next();
            /** 判断是否是OP_ACCEPT事件**/
            if(key.isAcceptable()){
                /** 从SelectionKey获取对应通道ServerSocketChannel**/
                ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                /** 获取SocketChannel**/
                SocketChannel accept = socketChannel.accept();
                accept.close();
            }
        }
        serverSocketChannel.close();
    }
上一篇 下一篇

猜你喜欢

热点阅读