程序员

五.Selector

2017-09-29  本文已影响0人  蜗牛1991

一.Selector是什么

connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT
accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT
read:读事件,对应值为SelectionKey.OP_READ
write:写事件,对应值为SelectionKey.OP_WRITE

二.层次图

image.png
  public static Selector open() throws IOException {  
        return SelectorProvider.provider().openSelector();  
    } 
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
 void cancel(SelectionKey k) {                       // package-private
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
protected Set selectedKeys;//已经操作事件准备就绪的选择key(为解决1.4bug而存在) 
protected HashSet keys;//与选择器关联的key集合(为解决1.4bug而存在)  
private Set publicKeys;//外部访问key集合的代理,真正使用的  
private Set publicSelectedKeys;//外部访问就绪key集合代理,真正使用的    ;
public Set<SelectionKey> selectedKeys() {...return this.publicSelectedKeys; }}
public Set<SelectionKey> keys() { ....return this.publicKeys;}
 public int select() throws IOException {
        return this.select(0L);
    }
  private int lockAndDoSelect(long var1//**超时时间**/) throws IOException {
        synchronized(this) {
            if(!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                Set var4 = this.publicKeys;
                int var10000;
                synchronized(this.publicKeys) {
                    Set var5 = this.publicSelectedKeys;
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
 protected abstract int doSelect(long var1) throws IOException;
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        if(!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {
            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
           //设置key的附加物  
            var4.attach(var3);
            Set var5 = this.publicKeys;
            synchronized(this.publicKeys) {
            //完成实际的注册工作 
                this.implRegister(var4);
            }
            //设置key的兴趣事件集 ,var2是 SelectionKey关系值
            var4.interestOps(var2);
            return var4;
        }
    }
    //待子类实现  
   protected abstract void implRegister(SelectionKeyImpl selectionkeyimpl);

背景知识

1.FileDescriptor:文件描述符用于描述系统底层的特殊的结构句柄,可以被用来表示开放文件、开放
套接字等,包含 private int fd;//文件描述值 和private long handle;//初始化文件描述句柄
2.句柄:标识应用程序中的不同对象应用程序能够通过句柄访问相应的对象的信息,但是句柄不是指针,程序不能利用句柄来直接阅读文件中的信息。如果句柄不在IO文件中,它是毫无用处的。 句柄是Windows用来标志应用程序中建立的或是使用的唯一整数,大量使用了句柄来标识对象。

    private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量  
    private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量  
    private SelectionKeyImpl channelArray[];//选择器关联通道集合  
    private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合  
   private final List threads = new ArrayList();//选择操作线程集合  
    private final FdMap fdMap = new FdMap();//存放选择key文件句柄与选择key映射关系的Map  
   private final SubSelector subSelector = new SubSelector();//子选择器  
    private int totalChannels;//注册到选择器的通道数量  
    private int threadsCount;//选择线程数  
    private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操的管道  
    private final int wakeupSourceFd;//唤醒管道源通道文件句柄
    private final int wakeupSinkFd;//唤醒管道sink通道文件句柄
  //四个同步锁
    private Object closeLock;//选择器关闭同步锁  
    private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步  
    private final StartLock startLock = new StartLock();//选择操作开始锁  
    private final FinishLock finishLock = new FinishLock();//选择操作结束锁  
  //初始化
 WindowsSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();//获取句柄
        SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
        var2.sc.socket().setTcpNoDelay(true);
        this.wakeupSinkFd = var2.getFDVal();//获取句柄
        this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    }

fdMap:存放选择key文件句柄与选择key的HashMap

   private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) {
            return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1));
        }

pollWrapper: 存放选择key和通道及其相关兴趣事件到本地内存

 private static final short EVENT_OFFSET = 4;//兴趣事件开始位置  
 static short SIZE_POLLFD = 8;//句柄长度int(4)+兴趣事件(4) 
 void addWakeupSocket(int var1//索引, int var2//句柄) {
        this.putDescriptor(var2, var1);
        this.putEventOps(var2, 1);
    }
//将文件描述放在索引var2上
void putDescriptor(int var1, int var2) {
        this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
    }
//存放索引文件描述信息的兴趣操作事件 
void putEventOps(int var1, int var2) {
        this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
    }
image.png
背景知识
在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。在linux2.6(准确来说是2.5.44)由内核直接支持的方法。epoll解决了select和poll的缺点。epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,而不是在等待的时候重复拷贝,保证了每个fd在整个过程中只会拷贝1次。epoll它所支持的fd上限是最大可以打开文件的数目,具体数目可以cat /proc/sys/fs/file-max查看,一般来说这个数目和系统内存关系比较大。epoll在注册新的事件时,为每个fd指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的fd加入一个就绪表中。(所以epoll实际只需要遍历就绪表)。
参考:Linux下I/O多路复用select, poll, epoll 三种模型的Python使用

doSelect方法:其中 subSelector.poll() 是select的核心,由native函数poll0实现,SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key。每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock

 protected int doSelect(long var1) throws IOException {
       ......
       this.subSelector.poll();        
       .....
    }
 private int poll() throws IOException {
        return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
 }
 private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);

SelectionKeyImpl保存注册时的channel、selector、event以及保存在pollWrapper的偏移位置index。

implRegister方法:首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。

protected void implRegister(SelectionKeyImpl selectionkeyimpl)  
{  
    //同步关闭锁,以防在注册的过程中,选择器被关闭  
    synchronized(closeLock)  
    {  
        if(pollWrapper == null)  
            //文件描述包装集合为null,即选器已关闭  
            throw new ClosedSelectorException();  
        growIfNeeded();//  
        channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合  
        selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引  
        fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap  
        keys.add(selectionkeyimpl);//添加key到key集合  
     //将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper  
        pollWrapper.addEntry(totalChannels, selectionkeyimpl);  
        totalChannels++;//通道计数器自增  
    }  
}  
void addEntry(int var1, SelectionKeyImpl var2) {
      //epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,
        this.putDescriptor(var1, var2.channel.getFDVal());
    }

参考:
NIO源码分析
深入浅出NIO Socket实现机制

三.总结

四.实例(聊天室的实现)

public class TestNonBlockingNIO1 {

    //客户端
    @Test
    public void client() throws IOException{
        //1. 获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        //2. 切换非阻塞模式
        sChannel.configureBlocking(false);

        //3. 分配缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);

        //4. 发送数据给服务端
        Scanner scan = new Scanner(System.in);

        while(scan.hasNext()){
            String str = scan.next();

            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }

        //5. 关闭通道
        sChannel.close();
    }

    //服务端
    @Test
    public void server() throws IOException{
        //1. 获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        //2. 切换非阻塞模式
        ssChannel.configureBlocking(false);

        //3. 绑定端口号
        ssChannel.bind(new InetSocketAddress(9898));

        //4. 获取选择器
        Selector selector = Selector.open();

        //5. 将通道注册到选择器上, 并且指定“监听事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        //6. 轮询监听选择器上的“准备就绪”的事件
        while(selector.select() > 0){

            //7. 获取当前选择器上所有“准备就绪”的选择键(监听事件)
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while(it.hasNext()){
                //8. 获取当前准备就绪的选择键
                SelectionKey sk = it.next();

                //9. 判断具体是哪个事件“准备就绪”
                if(sk.isAcceptable()){
                    //10.若接收状态就绪,获取当前客户端的连接
                    SocketChannel sChannel = ssChannel.accept();

                    //11.切换非阻塞式
                    sChannel.configureBlocking(false);

                    //12.将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13.若“读就绪”,获取当前选择器上就绪状态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();

                    //14.读取数据
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    int len = 0;
                    while((len = sChannel.read(buf)) > 0){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }

                //15.取消选择键
                it.remove();
            }
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读