NIO基础
不多BB开门见山
- BIO是同步阻塞式的IO,ServerSocket.accept(),InputStream.read(),OutputStream.write()都会阻塞线程,使得一整个线程发生阻塞而无法去处理其他的工作,在这种模式下服务器要为每个客户端的请求都要创建一个线程,所以这种模式无法满足高并发,高性能的场景
- NIO是同步非阻塞式的IO,相对BIO优点就是不阻塞线程,客户端连接,读写操作全是异步进行的,适合高性能,高负载的网络服务器
NIO的核心概念:
-
Channel:在我们传统的io中主要是使用流也就是Stream,Stream是单向的并且是阻塞的,只能是输出或者是输入,而Channel是双向的,可以输入和输出同时进行,并且拥有两种模式:阻塞和非阻塞。利用Channel的非阻塞这样我们就可以使用一个线程去处理多个Channel的多个读写,Channel主要有以下几种类型
1、FileChannel:从文件读取数据的
2、DatagramChannel:读写 UDP 网络协议数据
3、SocketChannel:读写 TCP 网络协议数据
4、ServerSocketChannel:可以监听 TCP 连接 -
Selector: Selector叫做多路复用器,是用于监控多条Channel的。Selector可以轮询注册在其上的Channel,当Channel发生读或者写事件时,Channel就会被轮询出来,可以通过SelectionKey获取这些Channel
-
Buffer:Buffer是NIO的读写的中转池,它提供了对数据的结构化访问以及维护读写位置等信息。NIO的读和写都是要从Buffer中进行,所有数据都通过 Buffer 对象处理,所以,输出操作时不会将字节直接写入到 Channel 中,而是将数据写入到 Buffer 中;同样,输入操作也不会从 Channel 中读取字节,而是将数据从 Channel 读入 Buffer,再从 Buffer 获取这个字节。
Buffer
Buffer实质是一个字节数组,包含这些类型:CharBuffer、DoubleBuffer、IntBuffer、LongBuffer、ByteBuffer、ShortBuffer、FloatBuffer
buffer的结构:
- position:读模式时是读取数据时的起始位置,写模式时会记录写入数据的位置,从写模式切换到读模式时,置为 0此时limit会置为刚刚position的位置帮我们记录写了多少数据,为什么写模式切到读模式position要置为0呢,原因是写模式position会处于数据的最后一个位置,如果读模式不置为零的话是从写入数据的最后一个位置开始读到limit的位置,所以会读不到任何数据;
- limit:代表最多能写入或者读取多少单位的数据,写模式下等于最大容量 capacity,代表写的最大容量是多少;从写模式切换到读模式时,等于position,然后再将 position 置为 0,所以,读模式下,limit 表示最大可读取的数据量,position 代表从哪里读起,这个值与实际写入的数量相等。
- capacity:表示 buffer 容量,创建时分配。
Buffer读写模式的切换
Buffer利用flip() 进行读写模式的切换,在写模式下position 会记录写入的字节的位置,limit会等于capacity表示允许最大可写容量,读的时候调用flip(),Buffer切换至读模式,limit会被至为position的位置表示最大的可读容量,而position会被至为0,一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。
compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
操作buffer时的步骤为:
1.写入数据到 Buffer;
2.调用 flip() 方法;
3.从 Buffer 中读取数据;
4.调用 clear() 方法或者 compact() 方法。
5.当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 .flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。
Selector
Selector通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。通道和缓冲区的机制,使得 Java NIO 实现了同步非阻塞 IO 模式,在此种方式下,用户进程发起一个 IO 操作以后便可返回做其它事情,而无需阻塞地等待 IO 事件的就绪,但是用户进程需要时不时的询问 IO 操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的 CPU 资源浪费。鉴于此,需要有一个机制来监管这些 IO 事件,如果一个 Channel 不能读写(返回 0),我们可以把这件事记下来,然后切换到其它就绪的连接(channel)继续进行读写。在 Java NIO 中,这个工作由 selector 来完成,
Selector 是一个对象,它可以接受多个 Channel 注册,监听各个 Channel 上发生的事件,并且能够根据事件情况决定 Channel 读写。这样,通过一个线程可以管理多个 Channel,从而避免为每个 Channel 创建一个线程,节约了系统资源。如果你的应用打开了多个连接(Channel),但每个连接的流量都很低,使用 Selector 就会很方便。
要使用 Selector,就需要向 Selector 注册 Channel,然后调用它的 select() 方法。这个方法会一直阻塞到某个注册的通道有事件就绪,这就是所说的轮询。一旦这个方法返回,线程就可以处理这些事件。
Selector说白了就是能帮我们把注册在上面的各个通道并且是我们感兴趣的事件轮询出来,这个轮询不是普通的循环遍历,而是一种系统级别的操作,性能比普通的轮询高,所以轮询通道连接最好不要自己用代码实现,而是使用多路复用器
一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统BIO一连接一线程模型,性能、弹性伸缩能力和可靠性都得到了极大的提升.NIO编码
整个NIO的步骤如下:server端
public class ChatServer {
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
private Charset charset = Charset.forName("UTF-8");
private int port;
public ChatServer() {
this(DEFAULT_PORT);
}
public ChatServer(int port) {
this.port = port;
}
private void start() {
try {
//打开ServerSocketChannel,这是所有客户端连接的父管道,所有的客户端连接都要通过它
server = ServerSocketChannel.open();
//设置非阻塞通道
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(port));
//多路复用器
selector = Selector.open();
//将ServerSocketChannel注册到多路复用器上,并监听客户端连接事件
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器, 监听端口:" + port + "...");
while (true) {
//表示没有就绪的通道,跳过本次循环
if (selector.selectNow()==0){//非阻塞监听,也可以设置为阻塞监听:selector.select
continue;
}
//selector.select如果设置为阻塞监听就不需要上面的if判断,因为设置阻塞监听如果通道没有事件,它会将自己阻塞住,有事件才玩往下走
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
// 处理被触发的事件
handles(key);
}
//清理通道,避免事件重复处理
selectionKeys.clear();
//另一种李大佬书上的写法
/** selector.select();
selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
handles(iterator.next());
//清理事件,避免事件重复处理
iterator.remove();
}
**/
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
// ACCEPT事件 - 和客户端建立了连接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//将客户端连接进来的通道
SocketChannel client = server.accept();
client.configureBlocking(false);
//注册通道的读监听事件
client.register(selector, SelectionKey.OP_READ);
System.out.println(getClientName(client) + "已连接");
}
// READ事件 - 客户端发送了消息
else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
//读取客户端消息
String fwdMsg = receive(client);
if (fwdMsg.isEmpty()) {
// 取消这个通道的监听事件
key.cancel();
//通知多路复用器重新调整监听,selector.wakeup()在selector是阻塞模式的时候可以用,可以唤醒阻塞
//selector.wakeup();
} else {
System.out.println(getClientName(client) + ":" + fwdMsg);
//消息转发
forwardMessage(client, fwdMsg);
// 检查用户是否退出
if (readyToQuit(fwdMsg)) {
key.cancel();
//通知多路复用器重新调整监听,selector.wakeup()在selector是阻塞模式的时候可以用,可以唤醒阻塞
//selector.wakeup();
System.out.println(getClientName(client) + "已断开");
}
}
}
}
//将其他客户的消息转发给客户端
private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
for (SelectionKey key: selector.keys()) {
Channel connectedClient = key.channel();
if (connectedClient instanceof ServerSocketChannel) {
continue;
}
//key.isValid()判断SelectionKey是否有效,
//client.equals(connectedClient) 判断消息不是这个客户端发的,避免自己给自己发消息
if (key.isValid() && !client.equals(connectedClient)) {
wBuffer.clear();
//写消息到Buffer中
wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));
//切换到读状态
wBuffer.flip();
//判断Buffer中是否还残存数据
while (wBuffer.hasRemaining()) {
//将Buffer的数据写入通道
((SocketChannel)connectedClient).write(wBuffer);
}
}
}
}
//接收客户端消息
private String receive(SocketChannel client,SelectionKey key) throws IOException {
try {
rBuffer.clear();
//将通道的消息读到缓冲区
while (client.read(rBuffer) > 0) ;
//切换到读模式
rBuffer.flip();
}catch (Exception e){
key.cancel();
client.socket().close();
client.close();
System.out.println(e.toString());
return StringUtils.EMPTY;
}
//从缓冲区读数据
return String.valueOf(charset.decode(rBuffer));
}
private String getClientName(SocketChannel client) {
return "客户端[" + client.socket().getPort() + "]";
}
private boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer(8888);
chatServer.start();
}
}
client
public class ChatClient {
private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
private static final int DEFAULT_SERVER_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private String host;
private int port;
private SocketChannel client;
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
public ChatClient() {
this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
}
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void start() {
try {
client = SocketChannel.open();
client.configureBlocking(false);
selector = Selector.open();
//注册连接监听事件
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(new InetSocketAddress(host, port));
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handles(key);
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e) {
// 用户正常退出
} finally {
close(selector);
}
}
/*
由于我们使用的SocketChannel处于非阻塞模式,当调用connect()方法时,调用会立即返回,但是连接的过程还在进行,
需要后续调用finishConnect()方法来完成连接过程。在连接过程已经启动,但尚未完成之前,
isConnectionPending()会返回true,这就是我们此时在检测的状态。如果连接未能正常创建,
调用finishConnect()则会抛出IOException异常,标志着连接失败。
*/
private void handles(SelectionKey key) throws IOException {
// CONNECT事件 - 连接就绪事件
if (key.isConnectable()) {
SocketChannel client = (SocketChannel) key.channel();
if (client.isConnectionPending()) {
//调用finishConnect()方法来完成连接过程
client.finishConnect();
// 处理用户的输入
new Thread(new UserInputHandler(this)).start();
}
client.register(selector, SelectionKey.OP_READ);
}
// READ事件 - 服务器转发消息
else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
String msg = receive(client);
if (msg.isEmpty()) {
close(selector);
} else {
System.out.println(msg);
}
}
}
public void send(String msg) throws IOException {
if (msg.isEmpty()) {
return;
}
wBuffer.clear();
wBuffer.put(charset.encode(msg));
wBuffer.flip();
while (wBuffer.hasRemaining()) {
client.write(wBuffer);
}
// 检查用户是否准备退出
if (readyToQuit(msg)) {
close(selector);
}
}
private String receive(SocketChannel client) throws IOException {
rBuffer.clear();
while (client.read(rBuffer) > 0);
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
class UserInputHandler implements Runnable {
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
try {
// 等待用户输入消息
BufferedReader r =
new BufferedReader(new InputStreamReader(System.in));
while (true) {
String input = r.readLine();
// 向服务器发送消息
chatClient.send(input);
// 检查用户是否准备退出
if (chatClient.readyToQuit(input)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ChatClient client = new ChatClient("127.0.0.1", 8888);
client.start();
}
}
image.png
image.png
image.png
image.png