Selector.select()
Netty的底层依然是依赖于JDK的NIO . 开发NIO服务端的代码如下所示
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class Server {
// 缓冲区的大小
private static final int BUFFER_SIZE = 1024;
// 缓冲区
private static ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
// 选择器
private static Selector selector = null;
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080), 64);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
for (;;) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
handleKey(key);
iterator.remove();
}
}
} catch (Exception ignored) {
}
}
// 处理SelectionKey
private static void handleKey(SelectionKey key) throws IOException {
// 是否有连接进来
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
// SocketChannel通道的可读事件注册到Selector中
registerChannel(selector, socketChannel, SelectionKey.OP_READ);
// 连接成功 向Client打个招呼
if (socketChannel.isConnected()) {
buffer.clear();
buffer.put("I am Server...".getBytes());
buffer.flip();
socketChannel.write(buffer);
}
}
// 通道的可读事件就绪
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.clear(); // 清空缓冲区
// 读取数据
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println("Server读取的数据:" + new String(buffer.array(), 0, len));
}
}
if (len < 0) {
// 非法的SelectionKey 关闭Channel
socketChannel.close();
}
// SocketChannel通道的可写事件注册到Selector中
registerChannel(selector, socketChannel, SelectionKey.OP_WRITE);
}
// 通道的可写事件就绪
if (key.isWritable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.clear(); // 清空缓冲区
// 准备发送的数据
String message_from_server = "Hello,Client... " + socketChannel.getLocalAddress();
buffer.put(message_from_server.getBytes());
buffer.flip();
socketChannel.write(buffer);
System.out.println("Server发送的数据:" + message_from_server);
// SocketChannel通道的可写事件注册到Selector中
registerChannel(selector, socketChannel, SelectionKey.OP_READ);
}
}
private static void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
if (channel == null) {
return;
}
channel.configureBlocking(false);
channel.register(selector, ops);
}
}
本篇文章就来讲解下selector.select()的功能 .
个人感觉, 好多功能都是按照三部曲来实现的 1.生产一个冰箱 2.把大象装进冰箱 3.把大象从冰箱取出来
1.生产一个冰箱
在调用Selector.open()的时候, 底层会创建各种属性和数据结构,用于存储相关信息.
// 源码位置 java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
在Windows平台返回 WindowsSelectorImpl 对象 , 在Linux平台返回 EPollSelectorImpl 对象 . 这里以EPollSelectorImpl分析 .
在Windows平台 , 通过跟踪open()源码的方式, 看不到sun.nio.ch.EPollSelectorImpl 这个类, 可以在Linux平台 或者 直接下载JDK源码 或者互联网搜索EPollSelectorImpl都可以看到这个类 .
EPollSelectorImpl 继承 SelectorImpl , 在 EPollSelectorImpl 内部有个EPollArrayWrapper类 , EPollArrayWrapper内部就是关于epoll相关的操作 .
IO多路复用的实现方式有 select, poll, epoll . epoll 主要涉及三个方法: epoll_create, epoll_ctl, epoll_wait
图片.png 图片.png 图片.png个人认为, 要想学好Java, 依然要对C语言, 包括一些系统调用了解或熟悉.
在实例化EPollSelectorImpl的时候, 创建了 Set<SelectionKey> selectedKeys , Map<Integer,SelectionKeyImpl> fdToKey, byte[] eventsLow 或 Map<Integer,Byte> eventsHigh 等重要属性 .
我们不必在意这些属性'散落'在哪些类里, 我们更关注的是, 实例化EPollSelectorImpl的时候 会 创建一些集合等属性对象, 用于存储数据. 这就是在生产一个冰箱, 为后面存储数据使用.
而且还会创建一个堆外内存的pollArray对象, 这个对象用于接收内核返回的可读写的文件描述符. 因为在进行调用epoll_wait的时候, 需要给内核传递一个对象, 内核会将已经准备就绪的文件描述符填充到这个对象.
通过man epoll_wait查看
图片.png
第二个参数 struct epoll_event *events 是一个传出参数, 而pollArray对象就会传到这个参数上 .
所有与内核交互的对象, 必须是堆外内存的对象 .
2.把大象装进冰箱
在我们自己的代码中 会通过 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) 注册感兴趣的事件 .
文章后面会有一个函数之间调用的总图
// 源码位置 EPollSelectorImpl#implRegister
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
会将<fd, SelectionKeyImpl>的对应关系存储到 Map<Integer,SelectionKeyImpl> fdToKey 集合中, 假如我们有个6号文件描述符 fd=6, 把它存储到 fdToKey中, 即<6, SelectionKeyImpl>这样的关系 . 当6号文件描述符有数据进来的时候, 调用epoll_wait的时候, 内核就会把6号文件描述符返回给用户态, 我们再根据<6, SelectionKeyImpl>这个关系,就能找到这个SelectionKeyImpl 了 .
在调用register方法的时候, 不仅会存储<fd, SelectionKeyImpl>的对应关系, 还会将所有的fd存储到 int[] updateDescriptors 中, 也会将 <fd, events>的关系存储到byte[] eventsLow 或 Map<Integer,Byte> eventsHigh中.
一句话, 在上面我们已经生产了一个冰箱, 在这里, 我们把数据(也就是大象)放进这个冰箱里面.
3.把大象从冰箱取出来
selector.select()
关键代码最终会调用到EPollArrayWrapper 这个类里的方法.
会将之前上一步的文件描述符和对应的事件, 通过epoll_ctl系统调用, 放到epoll的红黑树上.
最终会调用到epoll_wait系统调用函数, 如果有文件描述符就绪, 就将对应的文件描述符放到堆外内存的pollArray对象上.
用户态拿到pollArray对象之后, 通过遍历, 根据fd从fdToKey中将SelectionKeyImpl 放到 Set<SelectionKey> selectedKeys集合中, 用户态在调用selector.selectedKeys()的时候, 就会将selectedKeys集合返回 . 这样我们的业务代码就拿到了 selectedKeys集合, 进行后续操作处理.
关于函数之间的调用如下图, 具体也可以查看 https://www.yuque.com/infuq/default/wy8fap#cGAYr
Reactor模型epoll版服务器 - Java语言.png