JAVA-IO(三)

2019-05-22  本文已影响0人  sschrodinger

JAVA-IO(三)

sschrodinger

2019/05/22


引用



JAVA NIO Selector


多路复用I/O的操作系统实现(linux)

select实现

select 函数提供了一种最基本的 I/O 复用实现。c 语言代码如下:

#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

其中,返回值返回准备就绪的 fd 数目,若超时则为0,若出错则为-1。

第一个参数 maxfdp1 代表的是需要监控的最大套接字加1。因为 linux 的套接字从0依次开始编号,所以,这也代表了监控的套接字数目,从0到 maxfdp1-1 的所有套接字都会被监控。

第二个参数到第四个参数 readset、writeset 和 exceptset 指定我们要让内核测试读、写和异常条件的描述字。我们可以将其想象成一个存放了文件描述符的集合。

note

  • 对于fd_set,有四种操作:
    • void FD_ZERO(fd_set *fdset); //清空集合
    • void FD_SET(int fd, fd_set *fdset);//将一个给定的文件描述符加入集合之中
    • void FD_CLR(int fd, fd_set *fdset);//将一个给定的文件描述符从集合中删除
    • int FD_ISSET(int fd, fd_set *fdset);//检查集合中指定的文件描述符是否可以读写

第五个参数 timeout 代表等待时间,有三种情况:
永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。

处理流程

  1. 使用copy_from_user从用户空间拷贝fd_set到内核空间
  2. 注册回调函数__pollwait
  3. 遍历所有 fd
    • 调用其对应的 poll 方法(对于 socket ,这个 poll 方法是 sock_poll ,sock_poll 根据情况会调用到 tcp_poll ,udp_poll 或者 datagram_poll),以 tcp_poll 为例,其核心实现就是 __pollwait ,也就是上面注册的回调函数。
    • __pollwait 的主要工作就是把 current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于 tcp_poll 来说,其等待队列是 sk->sk_sleep (注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时 current 便被唤醒了。
      poll 方法返回时会返回一个描述读写操作是否就绪的 mask 掩码,根据这个 mask 掩码给 fd_set 赋值。
    • 如果遍历完所有的 fd ,还没有返回一个可读写的 mask 掩码,则会调用 schedule_timeout 使调用 select 的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间( schedule_timeout 指定),还是没人唤醒,则调用 select 的进程会重新被唤醒获得 CPU,进而重新遍历 fd,判断有没有就绪的 fd。

note

  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  • select支持的文件描述符数量太小,默认是1024
poll实现

poll的实现和select没有本质的差别,都是轮询所有的 fd,不累述。

epoll实现

epollselectpoll 的 I/O 多路复用的增强实现版,主要由三个函数组成。分别是epoll_createepoll_ctlepoll_wait。下面详细说明这三个函数。

int epoll_create(int size):初始化 epoll 对象。epoll 初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用 epoll_create 时,会在这个文件系统中创建一个 file 节点。同时 epoll 会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个 list 链表,用于存储准备就绪的事件

note

  • 参数 size 是指操作系统能够获得的最大容量。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):主要用于向 epoll 对象注册(增加)需要监控的文件描述符和需要监控的动作,如 I/O 准备就绪等,或者修改或移除监控的文件描述符。具体的流程为,当执行epoll_ctl时,除了把 socket 句柄放到 epoll 文件系统里 file 对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪 list 链表。所以,当一个 socket 上有数据到了,内核在把网卡上的数据复制到内核中后,就把 socket 插入到就绪链表里。

note

  • 参数 op 表示操作类型,总共有三种:
    • EPOLL_CTL_ADD:注册新的 fdepfd 中;
    • EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
    • EPOLL_CTL_DEL:从 epfd 中删除一个 fd
  • 参数 epoll_event* 表示监听的事件,总共有七种:
    • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    • EPOLLOUT:表示对应的文件描述符可以写;
    • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    • EPOLLERR:表示对应的文件描述符发生错误;
    • EPOLLHUP:表示对应的文件描述符被挂断;
    • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)阻塞函数,观察就绪队列里有没有数据,如有数据则返回。

note

  • 参数 epoll_event* 存储着返回的准备好的事件

对任务的就绪,在 epoll 模型中,总共有两种模式,第一种是 Level Triggered (水平触发模式)和 Edge Triggered (边缘触发)模式。

水平触发的意思是当epoll_ wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。即,只要不处理,就会一直触发这个事件

边缘触发的意思是当epoll_ wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。

note

  • 在 ET 模式下,当有事件发生时,系统只会通知你一次,即在调用epoll_wait返回 fd 后,不管这个事件你处理还是没处理,处理完没有处理完,当再次调用epoll_wait时,都不会再返回该 fd,这样的话程序员要自己保证在事件发生时要及时有效的处理完该事件。例如:fd 发生了 IN 事件,在调用epoll_wait后发现了该时间,程序员要保证在本次轮询中对该 fd 做了读操作,且还要循环调用recv操作,直到读到的recv的返回值小于请求值,或者遇到 EAGAIN 错误,否则,在下次轮询时,如果该 fd 没有再次触发事件,你就没有机会知道这个 fd 需要处理。。
  • 由于在 ET 模式中需要一直处理数据直到处理完成,所以一定要是非阻塞I/O操作,不然可能会饿死其他 I/O。

epoll 编程模板如下:

#include <iostream>
#include <sys/socket.h> 
#include <sys/epoll.h>
#include <netinet/in.h> 
#include <arpa/inet.h>
#include <fcntl.h> 
#include <unistd.h> 
#include <stdio.h>

#define MAXLINE 10 
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555 
#define INFTIM 1000

int main()
{
   int i, maxi, listenfd, connfd, sockfd, epfd, nfds; 
   ssize_t n; 
   char line[MAXLINE];
   socklen_t clilen;
   //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
   struct epoll_event ev,events[20];
   //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256 
   epfd = epoll_create(256);
   listenfd = socket(AF_INET, SOCK_STREAM, 0);

   setnonblocking(listenfd);       //把用于监听的socket设置为非阻塞方式
   ev.data.fd = listenfd;          //设置与要处理的事件相关的文件描述符
   ev.events = EPOLLIN | EPOLLET;  //设置要处理的事件类型
   epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);     //注册epoll事件
   
   
   listen(listenfd, LISTENQ);

   maxi = 0;
   for( ; ; )
   { 
      nfds = epoll_wait(epfd, events, 20, 500); //等待epoll事件的发生
      for(i = 0; i < nfds; ++i)                 //处理所发生的所有事件
      {
         if(events[i].data.fd == listenfd)      //监听事件
         {
            connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen); 
            if(connfd < 0)
            {
               perror("connfd<0");
               exit(1);
            }
            setnonblocking(connfd);           //把客户端的socket设置为非阻塞方式
            ev.data.fd=connfd;                //设置用于读操作的文件描述符
            ev.events=EPOLLIN | EPOLLET;      //设置用于注册的读操作事件
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            //注册ev事件
         }
         else if(events[i].events&EPOLLIN)      //读事件
         {
            if ( (sockfd = events[i].data.fd) < 0)
            {
               continue;
            }
            if ( (n = read(sockfd, line, MAXLINE)) < 0) // 这里和IOCP不同
            {
               if (errno == ECONNRESET)
               {
                  close(sockfd);
                  events[i].data.fd = -1; 
               }
               else
               {
                  std::cout<<"readline error"<<std::endl;
               }
            }
            else if (n == 0)
            {
               close(sockfd); 
               events[i].data.fd = -1; 
            }
            ev.data.fd=sockfd;              //设置用于写操作的文件描述符
            ev.events=EPOLLOUT | EPOLLET;   //设置用于注测的写操作事件 
            //修改sockfd上要处理的事件为EPOLLOUT
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
         } 
         else if(events[i].events&EPOLLOUT)//写事件
         {
            sockfd = events[i].data.fd;
            write(sockfd, line, n);
            ev.data.fd = sockfd;               //设置用于读操作的文件描述符
            ev.events = EPOLLIN | EPOLLET;     //设置用于注册的读操作事件
            //修改sockfd上要处理的事件为EPOLIN
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
         } 
      }
   }
}

Java nio select

框架

select 框架的写法逻辑和 epoll 非常相似,如下使 select 服务器的实现框架:

public class Demo() {
    public void demo() {
    
        //step 1
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);     
        serverChannel.socket().bind(new InetSocketAddress(port));
        
        //step 2
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        //step 3
        while(true){
            int n = selector.select();
            if(n > 0) {
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey selectionKey = iter.next();
                    //do something
                    iter.remove();
                }
            }
        }
    }
}

其中,
step 1 创建一个非阻塞的服务器通道并将它绑定到端口上;
step 2 创建一个 selector 对象,并将服务器通道注册到监听列表中;
step 3 通过 select 函数开始选择准备好的通道并进行业务逻辑开展。

对于一个可读可写的服务器,可以在 do some thing 之间填充如下代码片段:

if (selectionKey.isAcceptable()) {
    SocketChannel sc = server.accept();
    sc.configureBlocking(false);
    sc.register(selector, SelectionKey.OP_READ);
}
if (sk.isReadable()) {
    SocketChannel client = (SocketChannel) sk.channel();
    ByteBuffer buffer = ByteBuffer.allocate(2048);
    client.read(buffer);
    String content = "";
    while (client.read(buffer) > 0) {
        buffer.flip();
        content += charset.decode(buffer);
    }
    System.out.println(buffer);
    System.out.println("读到数据:" + content);
             
    buffer.flip();
    client.write(buffer);
    buffer.clear();
}

note

  • //do something 之后一定要将 iter 移除 seletedKeys 列表。
框架构成

nio select 框架最主要的实现点为非阻塞的 nio 和类似于 epoll 的操作系统函数封装(在 linux 中使用 epoll,在 windows 中使用 IOCP)。

nio 引入通道的概念,即 Channel,通道和流的最大区别是通道可读可写,而流要么只能读,要么只能写。同时,部分的通道实现了 SelectedChannel 接口,可以设置成非阻塞模式。

SelectionKey 是框架中非常重要的结构,他保存了通道的引用、绑定的selecotr对象以及这个通道对什么事件感兴趣的标志位。

Selector 类维护了一个注册集合准备完毕事件的 SelectionKey 集合。当有事件准备完成时,Selector 类会自动更新准备完毕事件的 SelectionKey 集合。如果调用 select 方法,会查找准备完毕事件的 SelectionKey 集合,如果大小不为0,则返回大小,否则阻塞。当 select 方法返回后,我们就可以根据准备完毕事件的 SelectionKey 集合进行对应的操作。

importance

  • 操作完成后,需要手动删除准备完毕事件的 SelectionKey 集合中元素,因为框架不会帮我们删除,否则 select 函数永远不会阻塞。
实现原理

Selector 类没有显示的构造方法,二是利用静态方法 open() 新建一个 Selector 类。追溯这个方法,方法如下:

public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

真正的 open 方法是由 SelectorProvider.provider() 提供的,查看 SelectProvider.provider() 的源码如下:

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
        new PrivilegedAction<SelectorProvider>() {
            public SelectorProvider run() {
                if (loadProviderFromProperty())
                    return provider;
                if (loadProviderAsService())
                    return provider;
                provider = sun.nio.ch.DefaultSelectorProvider.create();
                return provider;
            }
        });
    }
}

最关键的一句话为 provider = sun.nio.ch.DefaultSelectorProvider.create();,这个表明在前面两种情况都不满足(没有这个服务或者没有设置一个系统变量)的情况下,返回一个默认的提供者。

查看 Java 官方 github,DefaultSelectorProvider 定义在包 sun.nio.ch 中。Java 核心库按照操作系统的区别,分别实现了不同操作系统的 DefaultSelectorProvider,以 solaris 为例,create() 函数的具体形式为:

public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

以上代码根据不同的系统以反射的形式加载自己想要的 provider,以 linux 为例,加载的类形式如下:

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

可见,Selecotr 的 open() 方法实际上是新建了一个 EPollSelectorImpl 对象。我们继续看 EPollSelectorImpl 对象。EpollSelectorImpl 对象,继承自 SelectorImpl,SelectorImpl 部分源代码如下:

package sun.nio.ch;

import java.io.IOException;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.net.SocketException;
import java.util.*;


/**
 * Base Selector implementation class.
 */

public abstract class SelectorImpl extends AbstractSelector
{

    // The set of keys with data ready for an operation
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    protected HashSet<SelectionKey> keys;

    // Public views of the key sets
    private Set<SelectionKey> publicKeys;             // Immutable
    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

    public Set<SelectionKey> keys() {
        if (!isOpen() && !Util.atBugLevel("1.4"))
            throw new ClosedSelectorException();
        return publicKeys;
    }

    public Set<SelectionKey> selectedKeys() {
        if (!isOpen() && !Util.atBugLevel("1.4"))
            throw new ClosedSelectorException();
        return publicSelectedKeys;
    }

    protected abstract int doSelect(long timeout) throws IOException;

    private int lockAndDoSelect(long timeout) throws IOException {
        synchronized (this) {
            if (!isOpen())
                throw new ClosedSelectorException();
            synchronized (publicKeys) {
                synchronized (publicSelectedKeys) {
                    return doSelect(timeout);
                }
            }
        }
    }

    public int select(long timeout)
        throws IOException
    {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }

    public int select() throws IOException {
        return select(0);
    }

    public int selectNow() throws IOException {
        return lockAndDoSelect(0);
    }

    public void implCloseSelector() throws IOException {
        wakeup();
        synchronized (this) {
            synchronized (publicKeys) {
                synchronized (publicSelectedKeys) {
                    implClose();
                }
            }
        }
    }

    protected abstract void implClose() throws IOException;

    public void putEventOps(SelectionKeyImpl sk, int ops) { }

    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }

    protected abstract void implRegister(SelectionKeyImpl ski);

    void processDeregisterQueue() throws IOException {
        // Precondition: Synchronized on this, keys, and selectedKeys
        Set<SelectionKey> cks = cancelledKeys();
        synchronized (cks) {
            if (!cks.isEmpty()) {
                Iterator<SelectionKey> i = cks.iterator();
                while (i.hasNext()) {
                    SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                    try {
                        implDereg(ski);
                    } catch (SocketException se) {
                        throw new IOException("Error deregistering key", se);
                    } finally {
                        i.remove();
                    }
                }
            }
        }
    }

    protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;

    abstract public Selector wakeup();

}

从这里我们可以看到 register() 函数会将 selectionKey 注册到 keys,并且调用 implRegister(SelectionKeyImpl ski) 这个抽象函数,同理 select() 也会调用 doSelect(long time) 这个抽象函数,还有一个单独的 wakeup() 这个抽象函数。

EPollSelectorImpl 主要实现了 SelectorImpl 的抽象函数,部分代码如下:

import java.io.IOException;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.util.*;
import sun.misc.*;

/**
 * An implementation of Selector for Linux 2.6+ kernels that uses
 * the epoll event notification facility.
 */
class EPollSelectorImpl
    extends SelectorImpl
{

    // File descriptors used for interrupt
    protected int fd0;
    protected int fd1;

    // The poll object
    EPollArrayWrapper pollWrapper;

    // Maps from file descriptors to keys
    private Map<Integer,SelectionKeyImpl> fdToKey;

    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for interrupt triggering and clearing
    private final Object interruptLock = new Object();
    private boolean interruptTriggered = false;

    protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

    /**
     * Update the keys whose fd's have been selected by the epoll.
     * Add the ready keys to the ready queue.
     */
    private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }


    protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        int fd = Integer.valueOf(ch.getFDVal());
        fdToKey.put(fd, ski);
        pollWrapper.add(fd);
        keys.add(ski);
    }

    public void putEventOps(SelectionKeyImpl ski, int ops) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        pollWrapper.setInterest(ch.getFDVal(), ops);
    }

    public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                pollWrapper.interrupt();
                interruptTriggered = true;
            }
        }
        return this;
    }
}

最主要的结构为 EPollArrayWrapper,这个结构部分源代码如下:

package sun.nio.ch;

import java.io.IOException;
import java.security.AccessController;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import sun.security.action.GetIntegerAction;

/**
 * Manipulates a native array of epoll_event structs on Linux:
 *
 * typedef union epoll_data {
 *     void *ptr;
 *     int fd;
 *     __uint32_t u32;
 *     __uint64_t u64;
 *  } epoll_data_t;
 *
 * struct epoll_event {
 *     __uint32_t events;
 *     epoll_data_t data;
 * };
 *
 * The system call to wait for I/O events is epoll_wait(2). It populates an
 * array of epoll_event structures that are passed to the call. The data
 * member of the epoll_event structure contains the same data as was set
 * when the file descriptor was registered to epoll via epoll_ctl(2). In
 * this implementation we set data.fd to be the file descriptor that we
 * register. That way, we have the file descriptor available when we
 * process the events.
 */

class EPollArrayWrapper {
    // EPOLL_EVENTS
    private static final int EPOLLIN      = 0x001;

    // opcodes
    private static final int EPOLL_CTL_ADD      = 1;
    private static final int EPOLL_CTL_DEL      = 2;
    private static final int EPOLL_CTL_MOD      = 3;

    // Miscellaneous constants
    private static final int SIZE_EPOLLEVENT  = sizeofEPollEvent();
    private static final int EVENT_OFFSET     = 0;
    private static final int DATA_OFFSET      = offsetofData();
    private static final int FD_OFFSET        = DATA_OFFSET;
    private static final int OPEN_MAX         = IOUtil.fdLimit();
    private static final int NUM_EPOLLEVENTS  = Math.min(OPEN_MAX, 8192);

    // Special value to indicate that an update should be ignored
    private static final byte  KILLED = (byte)-1;

    // Initial size of arrays for fd registration changes
    private static final int INITIAL_PENDING_UPDATE_SIZE = 64;

    // maximum size of updatesLow
    private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged(
        new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));

    // The fd of the epoll driver
    private final int epfd;

     // The epoll_event array for results from epoll_wait
    private final AllocatedNativeObject pollArray;

    // Base address of the epoll_event array
    private final long pollArrayAddress;

    // The fd of the interrupt line going out
    private int outgoingInterruptFD;

    // The fd of the interrupt line coming in
    private int incomingInterruptFD;

    // The index of the interrupt FD
    private int interruptedIndex;

    // Number of updated pollfd entries
    int updated;

    // object to synchronize fd registration changes
    private final Object updateLock = new Object();

    // number of file descriptors with registration changes pending
    private int updateCount;

    // file descriptors with registration changes pending
    private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE];

    // events for file descriptors with registration changes pending, indexed
    // by file descriptor and stored as bytes for efficiency reasons. For
    // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
    // least) then the update is stored in a map.
    private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
    private Map<Integer,Byte> eventsHigh;

    // Used by release and updateRegistrations to track whether a file
    // descriptor is registered with epoll.
    private final BitSet registered = new BitSet();


    EPollArrayWrapper() throws IOException {
        // creates the epoll file descriptor
        epfd = epollCreate();

        // the epoll_event array passed to epoll_wait
        int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();

        // eventHigh needed when using file descriptors > 64k
        if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
            eventsHigh = new HashMap<>();
    }

    void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;
        incomingInterruptFD = fd0;
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }

    void putEventOps(int i, int event) {
        int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;
        pollArray.putInt(offset, event);
    }

    void putDescriptor(int i, int fd) {
        int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;
        pollArray.putInt(offset, fd);
    }

    int getEventOps(int i) {
        int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;
        return pollArray.getInt(offset);
    }

    int getDescriptor(int i) {
        int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;
        return pollArray.getInt(offset);
    }

    /**
     * Returns {@code true} if updates for the given key (file
     * descriptor) are killed.
     */
    private boolean isEventsHighKilled(Integer key) {
        assert key >= MAX_UPDATE_ARRAY_SIZE;
        Byte value = eventsHigh.get(key);
        return (value != null && value == KILLED);
    }

    /**
     * Sets the pending update events for the given file descriptor. This
     * method has no effect if the update events is already set to KILLED,
     * unless {@code force} is {@code true}.
     */
    private void setUpdateEvents(int fd, byte events, boolean force) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            if ((eventsLow[fd] != KILLED) || force) {
                eventsLow[fd] = events;
            }
        } else {
            Integer key = Integer.valueOf(fd);
            if (!isEventsHighKilled(key) || force) {
                eventsHigh.put(key, Byte.valueOf(events));
            }
        }
    }

    /**
     * Returns the pending update events for the given file descriptor.
     */
    private byte getUpdateEvents(int fd) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            return eventsLow[fd];
        } else {
            Byte result = eventsHigh.get(Integer.valueOf(fd));
            // result should never be null
            return result.byteValue();
        }
    }

    /**
     * Update the events for a given file descriptor
     */
    void setInterest(int fd, int mask) {
        synchronized (updateLock) {
            // record the file descriptor and events
            int oldCapacity = updateDescriptors.length;
            if (updateCount == oldCapacity) {
                int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
                int[] newDescriptors = new int[newCapacity];
                System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
                updateDescriptors = newDescriptors;
            }
            updateDescriptors[updateCount++] = fd;

            // events are stored as bytes for efficiency reasons
            byte b = (byte)mask;
            assert (b == mask) && (b != KILLED);
            setUpdateEvents(fd, b, false);
        }
    }

    /**
     * Add a file descriptor
     */
    void add(int fd) {
        // force the initial update events to 0 as it may be KILLED by a
        // previous registration.
        synchronized (updateLock) {
            assert !registered.get(fd);
            setUpdateEvents(fd, (byte)0, true);
        }
    }

    /**
     * Remove a file descriptor
     */
    void remove(int fd) {
        synchronized (updateLock) {
            // kill pending and future update for this file descriptor
            setUpdateEvents(fd, KILLED, false);

            // remove from epoll
            if (registered.get(fd)) {
                epollCtl(epfd, EPOLL_CTL_DEL, fd, 0);
                registered.clear(fd);
            }
        }
    }

    /**
     * Close epoll file descriptor and free poll array
     */
    void closeEPollFD() throws IOException {
        FileDispatcherImpl.closeIntFD(epfd);
        pollArray.free();
    }

    int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }

    /**
     * Update the pending registrations.
     */
    private void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            while (j < updateCount) {
                int fd = updateDescriptors[j];
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;

                if (events != KILLED) {
                    if (isRegistered) {
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                        epollCtl(epfd, opcode, fd, events);
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
    }

    // interrupt support
    private boolean interrupted = false;

    public void interrupt() {
        interrupt(outgoingInterruptFD);
    }

    public int interruptedIndex() {
        return interruptedIndex;
    }

    boolean interrupted() {
        return interrupted;
    }

    void clearInterrupted() {
        interrupted = false;
    }

    static {
        IOUtil.load();
        init();
    }

    private native int epollCreate();
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    private native int epollWait(long pollAddress, int numfds, long timeout,
                                 int epfd) throws IOException;
    private static native int sizeofEPollEvent();
    private static native int offsetofData();
    private static native void interrupt(int fd);
    private static native void init();
}

该结构利用 JNI 编程,将 epoll 的系统调用封装成 native 函数,同时保存就绪队列等信息,作为 Java 层面和系统层面的桥梁,以实现 epoll 的封装,并构成 Java 层面的多路复用 I/O 框架。

上一篇 下一篇

猜你喜欢

热点阅读