NIO底层原理
IO基本概念
Linux的内核将所有外部设备都可以看做一个文件来操作,而对一个文件的读写都是通过内核提供的系统调用,内核给我们返回一个文件描述符file descriptor,文件描述符是一个数组索引,指向内核维护的文件表格,应用程序对文件的读写就是通过文件描述符的操作完成。一个基本的IO,它会涉及到两个系统对象,一个是调用这个IO的进程对象,另一个就是系统内核,当一个read操作发生时,它会经历如下阶段:
- 进程对象通过read系统调用向内核发起读请求
- 内核向硬件发送读指令,并等待读就绪。
- 内核把将要读取的数据复制到内核缓冲区
- 数据从内核缓冲区拷贝到用户进程空间中
同步与异步
所谓同步就是发出一个调用后,在没有得到结果之前该调用就不返回,就是调用者主动等待这个调用的结果。 异步则相反,调用发出后,这个调用就直接返回了,调用者不会立刻得到结果,而是在调用发出后,被调用者通过回调函数等方式来告知调用者。
举个通俗的例子:
你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
本质上,访问数据的方式,同步需要当前线程读写数据,在读写数据的过程中数据可能还没ready,可能会阻塞,而异步io则是操作系统等数据ready之后会通知进程数据好了,可以直接读了
阻塞io模型:
在缺省情形下,所有文件操作都是阻塞的,在进程空间中调用recvfrom,其系统调用直到数据报到达且被拷贝到应用进程的缓冲区中或者发生错误才返回,期间一直在等待。我们就说进程在从调用recvfrom开始到它返回的整段时间内是被阻塞的。
image.png当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段,准备数据,对于network IO, 很多时候数据在一开始还没有到达,比如还没有收到一个完整的UDP包,这个时候kernel就要等待足够的数据到来。而用户进程这边,整个进程会被阻塞,当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block状态,重新运行起来。
非阻塞IO模型
进程把一个套接口设置为非阻塞是在通知内核:当所请求的IO操作不能满足要求时,不把本进程投入睡眠,而是返回一个错误。也就是说当数据没有到达时并不等待,而是以一个错误返回。
image.png
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作,一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
IO复用模型
linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,select/poll会不断轮询所负责的所有socket,可以侦测许多fd是否就绪,但select和poll是顺序扫描fd是否就绪,并且支持的fd数量有限。linux还提供了epoll系统调用,它是基于事件驱动的方式,而不是顺序扫描,当某个socket有数据到达了,可以直接通知用户进程,而不需要顺序轮询扫描,提高了效率。
image.png
当进程调用了select,整个进程会被block,同时,kernel会监视所有select负责的socket,当任何一个socket的数据准备好了,select就会返回,这个图和阻塞IO的图其实并没有多大区别,事实上,还更差一点,因为这里需要使用两个System call,select和recvFrom,而blocking io只调用了一个system call(recvfrom),但是select的好处在与它可以同时处理多个connection,(如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
信号驱动异步IO模型
首先开启套接口信号驱动I/O功能, 并通过系统调用sigaction安装一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据报准备好被读时,就为该进程生成一个SIGIO信号。随即可以在信号处理程序中调用recvfrom来读数据报,井通知主循环数据已准备好被处理中。也可以通知主循环,让它来读数据报。
image.png异步I/O模型
告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核拷贝到用户自己的缓冲区)通知用户进程,这种模型和信号驱动模型的主要区别是:信号驱动IO:由内核通知我们何时可以启动一个IO操作,异步IO模型:由内核通知我们IO操作何时完成
image.png
用户进程发起read操作之后,立刻就可以开始去做其他的事了,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,不会对用户进程产生任何block,然后,kernel会等待数据准备完成,然后再将数据拷贝到用户进程内存,当着一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作已经完成。
小结 前面几种都是同步IO,在内核数据copy到用户空间都是阻塞的。最后一种是异步IO,通过API把IO操作交给操作系统处理,当前进程不关心具体IO的实现,通过回调函数或者信号量通知当前进程直接对IO返回结果进行处理。一个IO操作其实分成了两个步骤,发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二步是否阻塞,如果实际的IO读写阻塞请求进程,那就是同步IO,因此前四种都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那就是非阻塞IO.
举个简单例子来说明:
有A,B,C,D四个人在钓鱼:
A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B没事就睡觉,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。
A: 阻塞IO, B: 非阻塞IO: C: NIO D: AIO
AIO BIO NIO
- AIO异步非阻塞IO,AIO方式适用于连接数目多且连接比较长的架构,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
- NIO同步非阻塞IO,适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
- BIO同步阻塞IO,适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
Java对BIO、NIO、AIO的支持:
Java BIO: 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善
JAVA NIO: 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求才启动一个线程进行处理。
Java AIO: 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的IO请求都是由OS先完成了再通知服务器应用去启动线程进行处理。
Selector
Nio中的selector具体是一个什么样的东西?想想一个场景:在一个养鸡场,有这么一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来,如果鸡场的负责人想知道情况,只需要询问那个人即可。在这里,这个人就相当Selector,每个鸡笼相当于一个SocketChannel,每个线程通过一个Selector管理多个SocketChannel,
image.png
为了实现Selector管理多个SocketChannel,必须将具体得socketChannel对象注册到Selector,并声明需要监听的事件,一共有四种事件:
1、connect:客户端连接服务端事件,对应值为SelectionKey.OPCONNECT(8)
2、accept:服务端接收客户端连接事件,对应值为SelectionKey.OPACCEPT(16)
3、read:读事件,对应值为SelectionKey.OPREAD(1)
4、write:写事件,对应值为SelectionKey.OPWRITE(4)
当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行处理。
服务端代码
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//将选择器注册到连接到的客户端信道,
//并指定该信道key值的属性为OP_READ,
//同时为该信道指定关联的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}
服务端操作过程:
- 创建ServerSocketChannel实例,并绑定指定端口
- 创建selector实例
- 将serverSocketChannel注册到selector,并指定事件OP_ACCEPT,最底层的socket通过channel和selector建立关联
- 如果没有准备好的socket,select方法会被阻塞一段时间
- 如果底层有socket已经准备好,selector的select方法会返回socket的个数,而且selectedKeys方法会返回socket对应的事件(connect, accept、read、write);
- 根据事件类型,进行不同的逻辑处理
在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件,如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。如果客户端A发送数据,会触发read事件,下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。
Selector实现原理
SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现,其中,Selector是整个NIO Socket的核心实现。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
SelectorProvider在windows和linux下有不同的实现,provider方法会返回对应的实现。
Selector是如何做到同时管理多个socket? Selector初始化时,会实例化PollWrapper、SelectionKeyImpl数组和Pipe。
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
pollWrapper用Unsafe类申请一块物理内存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。
image.png
image.png
pollWrapper提供了fdVal和event数据的相应操作,如添加操作通过Unsafe的putInt和putShort实现。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何实现的
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException {
synchronized (regLock) {
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
如果该channel和selector已经注册过,则直接添加事件和附件。
否则通过selector实现注册过程。
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
- 以当前channel和selector为参数,初始化SelectionKeyImpl对象selectionKeyImpl,并添加附件attachment,
- 如果当前channel的数量totalChannels等于SelectionImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
- 3、如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。
4、pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
5、k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
所以,不管serverSocketChannel,还是socketChannel,在selector注册的事件,最终都保存在pollArray中。
接着,再来看看selector中的select是如何实现一次获取多个有事件发生的channel的,底层由selector实现类的doSelect方法实现,如下:
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
其中 subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd。
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
执行 selector.select() ,poll0函数把指向socket句柄和事件的内存地址传给底层函数
1、如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回;
2、一旦有对应的事件发生,poll0方法就会返回;
3、processDeregisterQueue方法会清理那些已经cancelled的SelectionKey;
4、updateSelectedKeys方法统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用。
简单说,nio是依赖操作系统的实现,java并不能一个线程同时监听多个socket,
,在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。
epoll原理
epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。
三个epoll相关的系统调用:
-
int epoll_create(int size)
epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。 -
int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。 -
int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。
epoll内部实现大概如下:
- epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
- 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
- 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。