Java IO模型:从BIO到NIO非阻塞Socket再到多路复
(一)BIO阻塞模型
BIO简单来说就是”one thread per connection“,也是最容易理解的一种IO模型。
如下的程序,我们只启动一个线程负责先阻塞的accept、然后从接受的socket里阻塞的read和write,读写完成之后才能继续处理下一个连接。意味着本服务器每次只能服务一个连接,其他连接都在os的accept队列里排队,等待我们的应用程序去accept然后处理。
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BIOSocketServer {
private static Logger logger = LoggerFactory.getLogger(BIOSocketServer.class);
public static void main(String[] args) {
ServerSocket server = null;
try {
server = new ServerSocket();
server.bind(new InetSocketAddress("localhost",9090), 100); //侦听本地9090端口,backlog设置为100
logger.info("单线程BIO服务启动...");
while(true) {
Socket client = server.accept(); //阻塞,等待可以接受一个客户端连接
InputStream is = client.getInputStream(); //读取的输入流
byte[] readBuf = new byte[4*1024];
while(is.read(readBuf)>0) { //read没返回-1说明流没有读完。没有数据读则会一直阻塞。
String msg = new String(readBuf);
logger.info("receive from client[{}:{}], msg:{}"
,client.getInetAddress().toString()
,client.getPort()
,msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(null!=server)
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
开两个telnet命令行窗口,第一个可以连上然后输入也可以被服务端read到并输出,第二个可以连上(os的tcp协议栈完成了连接建立并放入了accept队列),但不会被应用程序accept。因为第一个连接没有读写完,代码在while(is.read(readBuf)>0){...}
这个循环里,不会走到下一次外层的while循环accept的,这时候如果关闭第一个窗口,那么第二个窗口的连接就会被accept。
上面的程序的阻塞体现在两个地方
-
ServerSocket.accept()
从os的accept队列里拿完成握手的连接,拿不到没有就阻塞在这里。 -
InputStream.read(byte[])
我们这里使用的是socket.getInputStream()
,read方法也就是从socket的输入流里读取数据到我们的缓冲byte数组里,读不到就阻塞在这。返回-1就说明流关闭读完了。
我们可以看到,上面的IO模型存在很大的局限性,没法同时服务多个连接,就算改成多线程去处理各个连接的读写以完成服务多个连接的需求,但"one thread per connection"的设计也使得在处理大量连接的时候会创建同样多的线程,os维护线程的开销的能力和CPU多个线程之间进行切换的开销的能力是有极限的。硬件的算力最终都投入在了维护多线程切换而不是有效利用到执行业务逻辑本身。
所以引出了下面的NIO非阻塞socket和多路复用器。
(二)NIO非阻塞Socket
与java BIO对应的是java NIO,但这里先说说非阻塞socket,讲讲其在BIO也就是阻塞式socket之上的改进和特点,然后针对其不足,再引出多路复用器。
先基于java.nio.*
包中提供的ServerSocketChannel
和SocketChannel
开发一个单线程的非阻塞的socket服务器,虽然是单线程但由于线程步阻塞所以也可以同时支持多个客户端。加深对非阻塞Socket的理解。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 单线程非阻塞的socket server
* 不用多路复用器,可以同时支持多个客户端
* */
public class MySocketServer1 {
private static Logger logger = LoggerFactory.getLogger(MySocketServer1.class);
private static LinkedList<SocketChannel> clients = new LinkedList<>(); //用于存放连接上来的SocketChannel
public static void main(String[] args) {
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(9090), 100);
server.configureBlocking(false); //设置ServerSocketChannel为非阻塞
ByteBuffer readBuffer = ByteBuffer.allocateDirect(4*1024); //分配4KB堆外内存作为读缓冲
logger.info("single thread non-blocking socket server start...");
while(true){
//处理新练上来的连接
SocketChannel newClient = server.accept(); //非阻塞,立即返回null或者具体的SocketChannel
if(null != newClient) {
newClient.configureBlocking(false);
logger.info("accept a client socket, port=" + newClient.socket().getPort());
clients.add(newClient);
}
//处理正连着的连接
for(SocketChannel client : clients) {
int count = client.read(readBuffer); //从内核的socket缓冲读count个字节数据到用户态的read buffer
if(count > 0) {
readBuffer.flip(); //readBuffer内部的limit指针指向当前最后写入的位置,position指向0起始位
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);//把read buffer中当前可用数据存到bytes数组
String msg = new String(bytes);
logger.info("receive from client[{}:{}], msg:{}" , client.socket().getInetAddress(), client.socket().getPort(), msg);
readBuffer.clear();
ByteBuffer writeBuffer = ByteBuffer.wrap(("echo from server:"+msg).getBytes());
client.write(writeBuffer);
}
}
Thread.sleep(500); //防止CPU空转
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
简单来说,上面socket服务器的思路是,启动一个spin线程,不断的去非阻塞的从侦听本地端口的ServerSocket中accept接受socket,接受的socket也设置为非阻塞并保存到一个socket列表中,然后还是由这个线程处理当前socket列表中的也就是正连着本服务器的连接:从socket中读数据到read buffer,从write buffer中将数据写到socket给客户端。
上面的程序的优点是,只用一个线程就可以同时处理多个客户端连接,这个线程非阻塞的轮流为这些个客户端连接服务,不会出现BIO中如果是单线程必须服务完一个客户端连接待其关闭之后才能服务其他客户端的现象。
而缺点是,这个单线程必须不断的循环调用server.accept()和client.read()来确认是否有新连接以及是否有连接产生了可读数据。当空闲时,产生大量的CPU空转,为此上面的程序不得不Thread.sleep(500)
为了解决这个问题,随着操作系统内核的不断发展,提供了多路复用器这种IO模型,在linux中其代表实现方式为epoll,event poll,也叫事件轮询。
(三)多路复用器NIO模型
上面的非阻塞socket模型问题在于线程要不断的在应用程序侧循环调用server.accept()和client.read()来确认是否有新连接以及是否有连接产生了可读数据。如果应用程序能够预先知道当前有哪些连接的IO就绪、包括accept、read、write等等,那么就可以只处理这些连接的IO操作了,于是有了下面的多路复用器的模型。这里仍然使用单线程实现。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 单线程带多路复用器NIO服务器
* */
public class NioSelectorSocketServer {
private static Logger logger = LoggerFactory.getLogger(NioSelectorSocketServer.class);
private static final int backlog = 100;
private static Selector selector;
private static ByteBuffer readBuffer = ByteBuffer.allocateDirect(4*1024); //分配4KB堆外内存作为读缓冲
public static void main(String[] args) {
try {
selector = Selector.open(); //epoll_create
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress("localhost", 9090), backlog);
server.configureBlocking(false);//server socket非阻塞
server.register(selector, SelectionKey.OP_ACCEPT); //epoll_ctl
logger.info("single thread multiplexer NIO socket server start...");
while(true) {
selector.select(); //epoll_wait 阻塞直到有连接IO就绪(这里的连接指的是channel,包括ServerSocketChannel和SocketChannel)
Set<SelectionKey> readyKeys = selector.selectedKeys(); // 本次就绪的连接的key
for(SelectionKey key : readyKeys) { // 遍历这些就绪的连接key
if(key.isAcceptable()) { //是server socket,accept就绪
SocketChannel client = ((ServerSocketChannel)key.channel()).accept(); //执行accept,接受连接
client.configureBlocking(false); //新连上来的连接设置为非阻塞
client.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE); //epoll_ctl 新连上来的连接向selector注册读写事件
logger.info("accept a client socket adress:{}, port:{}" ,client.socket().getInetAddress().toString(), client.socket().getPort());
}
if(key.isReadable()) { //是client socket,读就绪
SocketChannel client = (SocketChannel)key.channel();
int count = client.read(readBuffer);
if(count > 0) { //通过read buffer把数据从socket读出来
readBuffer.flip();
byte[] bytes= new byte[readBuffer.limit()];
readBuffer.get(bytes);
String msg = new String(bytes);
logger.info("receive form client[{}:{}] ,msg:{}"
, client.socket().getInetAddress().toString()
, client.socket().getPort()
, msg);
readBuffer.clear();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
关于多路复用器epoll的简介,可以参考epoll,解决C10K问题的关键
这里说下上面程序的思路:整体上来说是要创建一个selector,然后一个单线程spin,它要负责处理从selector中select()出来的两类就绪channel对应的SelectionKey,一种是server socket的accept就绪,另一种是client socket的读写就绪事件。整个spin阻塞在selector.select()上,避免了线程无用的循环,只在真正有连接IO就绪的时候执行循环体。且通过selector.selectedKeys()从注册到selector上的所有连接之中找出当前IO就绪的连接,只遍历这些少量连接,而不是遍历全量连接。