JavaNio-Selector

2021-03-30  本文已影响0人  大风过岗

一、概览

在这篇文章中,我们将探索一下JavaNIO的Selector组件。selector提供了一个机制,该机制可以监视一个或多个NIO通道,当这些通道上的某些操作已就绪时,可以及时地识别到。

利用这种方式,单个线程就可以管理多个通道,因此也就可以管理多个网络连接。这也为编写高可用高扩展的网络服务器提供了技术保障。

二、 为什么要使用Selector

在Selector的帮助下,我们可以使用一个而非多个线程来管理多个通道。对于操作系统而言,线程间的上下文切换是十分昂贵的,而且使用的线程过多也极大地占据内存空间。

因此,使用的线程数越少越好。然而,需要记住的是,现代操作系统和cpu可以很好地处理多任务,因此,多线程的消耗也在随着时间而减少。

这里,我们将看下我们是如何利用selector来达到一个线程管理多个通道的。

注意: selector不仅可以帮助你读取数据,他们同样可以监听网络连接,并且提供在多个低速通道间写数据。

三、设置

要想使用selector的话,我们并不需要什么特殊的配置。我们所需要的类都在java.nio包下。我们只需要引入我们需要的包就可以啦。
之后,我们就可以把多个channel注册到此selector对象上了。当这些通道上有IO活动发生时,该selector自会提醒我们的。这也就解释了,我们何以能够在依赖于单线程完成从大量数据源中读写数据。

注册到selector上的channel必须是SelectableChannel的子类,因为只有这种类型的channel才能设置为非阻塞模式。

四、创建一个Selector

调用Selector类的open方法即可创建一个selector实例,此时它是用系统默认的selector提供者来创建一个新的selector的。

Selector selector = Selector.open();

五、注册selectable Channel

如果你想要让selector去监听某个通道的话,那么你首先需要把这些需要监听的channel都注册到该selector上。我们可以调用channel的register方法来完成注册。

但是在把某个channel注册到selector上之前,它必须被设置为非阻塞模式(non-blocking mode):

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

当然了,这也就意味着,我们无法把FileChannel和selector放在一起使用,因为FileChannel无法切换到非阻塞模式。通常我们都是把socket channel和selector放在一起使用。

channel.register()方法的第一个参数是我们先前创建的Selector对象,第二个参数是我们所关注的该channel上发生的事件。

我们总共可监听的事件有四种,每一个都是由SelectionKey中的一个常量来表示的:

. Connect
- 当某个客户端试图连接服务器时,就会触发该事件。由SelectionKey.OP_CONNECT来表示
. Accept
- 当服务器接收某个客户端的连接时,就会触发该事件。由SelectionKey.OP_ACCEPT来表示
. Read
- 当服务器已准备好从通道中读取数据时,就会触发该事件。由SelectionKey.OP_READ来表示
. Write
- 当服务器已准备好向该通道中写数据时,就会触发该事件。由SelectionKey.OP_WRITE来表示

方法的返回值是一个SelectionKey对象,这个对象就代表了某个通道注册到selector之后的注册结果。

六、SelectionKey 对象

正如我们在上面所看的那样,当我们把某个channel注册到selector上之后,我们就会得到一个SelectionKey对象,该对象中存储了通道注册的信息。

6.1 事件集

事件集参数定义了我们希望该selector监听此通道上的哪些事件。它是一个integer值,我们可以通过以下方式获取相关信息。

首先,我们可以通过SelectionKey的interestOps方法获取到事件集,然后我们把Selectionkey和此值做"与运算",我们就可以得到一个布尔值来表明此事件是不是我们监听的事件。

```
  int interestSet = selectionKey.interestOps();

  boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
  boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
  boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
  boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

```

6.2 就绪集

就续集定义了某个channel上有哪些事件是准备就绪的。它是一个整型数字。

我们有一种便捷的方式来获取的某个channel的就绪集:

```
  selectionKey.isAcceptable();
  selectionKey.isConnectable();
  selectionKey.isReadable();
  selectionKey.isWriteable();
```

6.3 通道

要想根据SelectionKey访问channel的话,有一个非常简单的方法:

Channel channel = key.channel();

6.4 Selector

从SelectionKey对象上获取对应的Selector对象也非常简单:

Selector selector = key.selector();

6.5 附属对象

我们可以向SelectionKey上附属一个对象,因为有时我们或许给某个channel分配一个自定义ID或者附属某个java对象,以便于可以跟踪该channel的行为情况:
我们可以使用SelectionKey的attach()方法来很方便地做到这一点:

key.attach(object);

Object object = key.attachment();

还有一种方式,就是在channel注册的时候,我们把需要附属的对象作为参数传递给register方法:

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

七、channel key选择

到目前为止,我们已经看到了如何创建一个selector对象,如何向selector上注册channel,以及观察SelectionKey的属性信息。

这才进行到一半,现在我们需要持续地挑选就绪事件:

int channels = selector.select();

此方法是一个阻塞方法,它会一直阻塞住直到有就绪事件到来。方法的返回值代表的是处于就绪状态channel个数。

紧接着,我们通常会获取需要处理的selectionKey

 Set<SelectionKey>  selectionKeys = selector.selectedKeys();

后面,我们只需要遍历这个集合,获取到对应的channel,并执行相应的处理动作即可。

八、完整示例

我们这里有个简单的服务端和客户端的简单示例,用以演示一下,使用selector是如何编写网络程序的。

8.1 服务端程序

public class EchoServer {

    private static final String POISION_PILL = "POISON_PILL";


    public static void main(String[] args) throws IOException {

        Selector selector   = Selector.open();

        ServerSocketChannel  serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost",5454));
        serverSocket.configureBlocking(false);
        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer  buffer = ByteBuffer.allocate(256);

        while (true){

            int select = selector.select();// selecting the ready set,this method blocks until at least one channel is ready for an operation

            Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 获取可用于处理的selected keys
            //遍历这些已准备就绪的事件,并处理
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()){

                SelectionKey key = iter.next();
                //逐一对这些已就绪事件进行处理

                //如果是接收网络连接的事件
                if(key.isAcceptable()){
                    register(selector,serverSocket);
                }

                //如果是可读事件
                if(key.isReadable()){
                    answerWithEcho(buffer,key);
                }
                iter.remove(); //由此可以看出,使用迭代器进行遍历,在遍历时,可以执行移出动作
            }
        }

    }



    private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {

        //如果发现,该key是网络套接字接收事件,则接收此客户端连接,并把此客户端连接注册到selector上
        SocketChannel  client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector,SelectionKey.OP_READ);
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {

        SocketChannel   client = (SocketChannel) key.channel();  //根据key取出对应的channel
        client.read(buffer); //把客户端发来的数据读进buffer中,可以和之前的相比就可看出
                             //之前是一个字节一个字节的以流的形式读取,现在是批量地读进buffer中


        String content = new String(buffer.array());
        if(content.trim().equals(POISION_PILL)){
            //如果收到了"POISION_PILL"字符串的话,则关闭此客户端
            client.close();
            System.out.println("Not accepting client messages anymore");
        }else {
            buffer.flip(); //
            client.write(buffer);
            buffer.clear(); //清空buffer
        }

    }


    public static Process  start() throws IOException,InterruptedException {

        String javaHome = System.getProperty("java.home");
        String  javaBin = javaHome + File.separator + "bin"  + File.separator + "java";
        String classPath= System.getProperty("java.class.path");
        String className= EchoServer.class.getCanonicalName();

        ProcessBuilder  builder = new ProcessBuilder(javaBin,"-cp", classPath,className);

        return builder.start();
    }

}


8.2 客户端程序

public class EchoClient {


    private static SocketChannel  client;
    private static ByteBuffer buffer;
    private static EchoClient instance;



    public static  EchoClient start(){

        if(instance == null){
            instance = new EchoClient();
        }
        return instance;
    }

    public static void stop()throws IOException{
        client.close();
        buffer = null;
    }


    private EchoClient(){

        try{

            client = SocketChannel.open(new InetSocketAddress("localhost",5454));
            buffer = ByteBuffer.allocate(256);
        }catch (IOException e){
            e.printStackTrace();
        }
    }


    public String sendMessage(String msg){

        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;

        try{
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            buffer.clear();
        }catch (IOException e){
            e.printStackTrace();
        }
        return response;
    }

}


8.3 测试程序

public class EchoTest {

    Process  server;

    EchoClient  client;


    @Before
    public void setup()throws IOException,InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }



    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {

        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        System.out.println(resp1);
        System.out.println(resp2);
    }


    @Test
    public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {

        Pipe pipe = Pipe.open();
        Selector selector = Selector.open();
        SelectableChannel channel = pipe.source();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ);

        List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

        CountDownLatch latch = new  CountDownLatch(1);

        new Thread(()->{

            invocationStepsTracker.add(">> Count down");
            latch.countDown();

            invocationStepsTracker.add(">> Count down");
            latch.countDown();

            try {

                invocationStepsTracker.add(">> Start select");
                selector.select();
                invocationStepsTracker.add(">> End select");
            } catch (IOException e) {
                e.printStackTrace();
            }

        }).start();

        invocationStepsTracker.add(">> Start await");
        latch.await();
        invocationStepsTracker.add(">> End await");
        invocationStepsTracker.add(">> Wakeup thread");
        selector.wakeup();
        // clean up
        channel.close();

        System.out.println("============输出StepTracker中的内容================");
        System.out.println(JSONObject.toJSONString(invocationStepsTracker));
    }

    @After
    public void tearDown()throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}


九、Selector.wakeup()

前面,我们已经讲到了,当我们调用selector.select()方法时,此方法会把当前线程阻塞住直到被监听的channel上有就绪事件
发生。但是我们可以在其他线程中调用selector.wakeup()方法来唤醒被此selector阻塞的线程。

调用selector.wakeup()方法产生的结果是:不管是否有通道处于就绪状态,被阻塞的线程都会立即返回,而非继续等待。

我们可以使用CountDownLatch来演示一下,并跟踪一下代码的执行情况:

@Test
   public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {

       Pipe pipe = Pipe.open();
       Selector selector = Selector.open();
       SelectableChannel channel = pipe.source();
       channel.configureBlocking(false);
       channel.register(selector, SelectionKey.OP_READ);

       List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

       CountDownLatch latch = new  CountDownLatch(1);

       new Thread(()->{

           invocationStepsTracker.add(">> Count down");
           latch.countDown();

           invocationStepsTracker.add(">> Count down");
           latch.countDown();

           try {

               invocationStepsTracker.add(">> Start select");
               selector.select();
               invocationStepsTracker.add(">> End select");
           } catch (IOException e) {
               e.printStackTrace();
           }

       }).start();

       invocationStepsTracker.add(">> Start await");
       latch.await();
       invocationStepsTracker.add(">> End await");
       invocationStepsTracker.add(">> Wakeup thread");
       selector.wakeup();
       // clean up
       channel.close();

       System.out.println("============输出StepTracker中的内容================");
       System.out.println(JSONObject.toJSONString(invocationStepsTracker));
   }

参考文献

  1. baeldung-JavaNIO-selector

  2. Oracle文档Selector

上一篇 下一篇

猜你喜欢

热点阅读