五.Selector
一.Selector是什么
- Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。
- 一个通道可以被注册到多个Selector选择器上,但对每个选择器而言只能被注册一次。
- SelectionKey:选择键封装了特定的通道与特定的Selector选择器的注册关系
connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT
accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT
read:读事件,对应值为SelectionKey.OP_READ
write:写事件,对应值为SelectionKey.OP_WRITE
二.层次图
image.png- 1.Selector实现open方法,我们发现SelectorProvider类。SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
- 2.AbstractSelector取消的key放在一个set集合中,对集合进行添加操作时,必须同步取消key set集合。反注册选择key完成的实际工作是,将key从key对应的通道的选择key数组中移除。
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
- 3.SelectorImpl:
- 其他线程获取选择器的就绪key和key集合,实际上返回的是key集合的代理publicKeys和就绪key集合的代理publicSelectedKeys。
protected Set selectedKeys;//已经操作事件准备就绪的选择key(为解决1.4bug而存在)
protected HashSet keys;//与选择器关联的key集合(为解决1.4bug而存在)
private Set publicKeys;//外部访问key集合的代理,真正使用的
private Set publicSelectedKeys;//外部访问就绪key集合代理,真正使用的 ;
public Set<SelectionKey> selectedKeys() {...return this.publicSelectedKeys; }}
public Set<SelectionKey> keys() { ....return this.publicKeys;}
- select方法(准备就绪事件数):委托给为lockAndDoSelect同步方法,获取key集合代理publicKeys和就绪key代理集合publicSelectedKeys,然后交给doSelect(long l)方法,这个方法为抽象方法,待子类扩展。
public int select() throws IOException {
return this.select(0L);
}
private int lockAndDoSelect(long var1//**超时时间**/) throws IOException {
synchronized(this) {
if(!this.isOpen()) {
throw new ClosedSelectorException();
} else {
Set var4 = this.publicKeys;
int var10000;
synchronized(this.publicKeys) {
Set var5 = this.publicSelectedKeys;
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}
protected abstract int doSelect(long var1) throws IOException;
- register方法:
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if(!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
//设置key的附加物
var4.attach(var3);
Set var5 = this.publicKeys;
synchronized(this.publicKeys) {
//完成实际的注册工作
this.implRegister(var4);
}
//设置key的兴趣事件集 ,var2是 SelectionKey关系值
var4.interestOps(var2);
return var4;
}
}
//待子类实现
protected abstract void implRegister(SelectionKeyImpl selectionkeyimpl);
背景知识
1.FileDescriptor:文件描述符用于描述系统底层的特殊的结构句柄,可以被用来表示开放文件、开放
套接字等,包含 private int fd;//文件描述值 和private long handle;//初始化文件描述句柄
2.句柄:标识应用程序中的不同对象应用程序能够通过句柄访问相应的对象的信息,但是句柄不是指针,程序不能利用句柄来直接阅读文件中的信息。如果句柄不在IO文件中,它是毫无用处的。 句柄是Windows用来标志应用程序中建立的或是使用的唯一整数,大量使用了句柄来标识对象。
- 4.WindowsSelectorImpl
全局变量
private final int INIT_CAP = 8;//选择key集合,key包装集合初始化容量
private static final int MAX_SELECTABLE_FDS = 1024;//最大选择key数量
private SelectionKeyImpl channelArray[];//选择器关联通道集合
private PollArrayWrapper pollWrapper;//存放所有文件描述对象(选择key,唤醒管道的源与sink通道)的集合
private final List threads = new ArrayList();//选择操作线程集合
private final FdMap fdMap = new FdMap();//存放选择key文件句柄与选择key映射关系的Map
private final SubSelector subSelector = new SubSelector();//子选择器
private int totalChannels;//注册到选择器的通道数量
private int threadsCount;//选择线程数
private final Pipe wakeupPipe = Pipe.open();//唤醒等待选择操的管道
private final int wakeupSourceFd;//唤醒管道源通道文件句柄
private final int wakeupSinkFd;//唤醒管道sink通道文件句柄
//四个同步锁
private Object closeLock;//选择器关闭同步锁
private final Object interruptLock = new Object();//中断同步锁,在唤醒选择操作线程时,用于同步
private final StartLock startLock = new StartLock();//选择操作开始锁
private final FinishLock finishLock = new FinishLock();//选择操作结束锁
//初始化
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();//获取句柄
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
this.wakeupSinkFd = var2.getFDVal();//获取句柄
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
fdMap:存放选择key文件句柄与选择key的HashMap
private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) {
return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1));
}
pollWrapper: 存放选择key和通道及其相关兴趣事件到本地内存
private static final short EVENT_OFFSET = 4;//兴趣事件开始位置
static short SIZE_POLLFD = 8;//句柄长度int(4)+兴趣事件(4)
void addWakeupSocket(int var1//索引, int var2//句柄) {
this.putDescriptor(var2, var1);
this.putEventOps(var2, 1);
}
//将文件描述放在索引var2上
void putDescriptor(int var1, int var2) {
this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}
//存放索引文件描述信息的兴趣操作事件
void putEventOps(int var1, int var2) {
this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}
image.png
背景知识:
在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。在linux2.6(准确来说是2.5.44)由内核直接支持的方法。epoll解决了select和poll的缺点。epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,而不是在等待的时候重复拷贝,保证了每个fd在整个过程中只会拷贝1次。epoll它所支持的fd上限是最大可以打开文件的数目,具体数目可以cat /proc/sys/fs/file-max查看,一般来说这个数目和系统内存关系比较大。epoll在注册新的事件时,为每个fd指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的fd加入一个就绪表中。(所以epoll实际只需要遍历就绪表)。
参考:Linux下I/O多路复用select, poll, epoll 三种模型的Python使用
doSelect方法:其中 subSelector.poll() 是select的核心,由native函数poll0实现,SubSelector主要有两个方法以poll从pollWrapper拉取关注读写事件的选择key。每个SelectThread使用,SubSelector从当前注册到选择器的通道中选取SubSelector索引所对应的批次的通道已经就绪的通道并更新操作事件。整个选择过程有startLock和finishLock来控制。再有在一个选择操作的所有子选择线程执行完,才释放finishLock
protected int doSelect(long var1) throws IOException {
......
this.subSelector.poll();
.....
}
private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}
private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
SelectionKeyImpl保存注册时的channel、selector、event以及保存在pollWrapper的偏移位置index。
implRegister方法:首先同步关闭锁,以防在注册的过程中,选择器被关闭;检查选择器是否关闭,没有关闭,则检查是否扩容,需要则扩容为pollWrapper为原来的两倍;检查过后,添加选择key到选择器通道集合,设置key在选择器通道集合的索引,添加选择key到文件描述fdMap,添加key到key集合,将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper,通道计数器自增。
protected void implRegister(SelectionKeyImpl selectionkeyimpl)
{
//同步关闭锁,以防在注册的过程中,选择器被关闭
synchronized(closeLock)
{
if(pollWrapper == null)
//文件描述包装集合为null,即选器已关闭
throw new ClosedSelectorException();
growIfNeeded();//
channelArray[totalChannels] = selectionkeyimpl;//添加到选择器通道集合
selectionkeyimpl.setIndex(totalChannels);//设置key在选择器通道集合的索引
fdMap.put(selectionkeyimpl);//添加选择key到文件描述fdMap
keys.add(selectionkeyimpl);//添加key到key集合
//将选择key添加到文件描述信息及关注操作事件包装集合pollWrapper
pollWrapper.addEntry(totalChannels, selectionkeyimpl);
totalChannels++;//通道计数器自增
}
}
void addEntry(int var1, SelectionKeyImpl var2) {
//epoll每次注册新的事件到epoll中,会把所有的fd(文件标识符)拷贝进内核,
this.putDescriptor(var1, var2.channel.getFDVal());
}
参考:
NIO源码分析
深入浅出NIO Socket实现机制
三.总结
- Selector是通过implRegister方法把每次注册新的SelectionKeyImpl事件拷贝到pollWrapper内存数组中,通过doSelect()的native函数poll0()拉取读写就绪的SelectionKeyImpl事件,如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回。
- ServerSocketChannelImpl的初始化主要是初始化ServerSocket通道线程thread,地址绑定,接受连接同步锁,默认创建ServerSocketChannelImpl的状态为未初始化,文件描述和文件描述id,如果使用本地地址,则获取本地地址。bind首先检查ServerSocket是否关闭,是否绑定地址,如果既没有绑定也没关闭,则检查绑定的socketaddress是否正确或合法;然后通过Net工具类的bind(native)和listen(native),完成实际的ServerSocket地址绑定和开启监听,如果绑定是开启的参数小于1,则默认接受50个连接。accept方法主要是调用accept0(native)方法接受连接,并根据接受来接
一旦有对应的事件发生,poll0方法就会返回。 - SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,最后尝试连接socket地址。从缓冲区读取字节序列写到通道write(ByteBuffer),首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区, 以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
channel的源码笔者还未细看,有兴趣的可以看参考部分。
参考:SocketChannelImpl 解析一(通道连接,发送数据)
四.实例(聊天室的实现)
public class TestNonBlockingNIO1 {
//客户端
@Test
public void client() throws IOException{
//1. 获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
//2. 切换非阻塞模式
sChannel.configureBlocking(false);
//3. 分配缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//4. 发送数据给服务端
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
//5. 关闭通道
sChannel.close();
}
//服务端
@Test
public void server() throws IOException{
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切换非阻塞模式
ssChannel.configureBlocking(false);
//3. 绑定端口号
ssChannel.bind(new InetSocketAddress(9898));
//4. 获取选择器
Selector selector = Selector.open();
//5. 将通道注册到选择器上, 并且指定“监听事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 轮询监听选择器上的“准备就绪”的事件
while(selector.select() > 0){
//7. 获取当前选择器上所有“准备就绪”的选择键(监听事件)
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
//8. 获取当前准备就绪的选择键
SelectionKey sk = it.next();
//9. 判断具体是哪个事件“准备就绪”
if(sk.isAcceptable()){
//10.若接收状态就绪,获取当前客户端的连接
SocketChannel sChannel = ssChannel.accept();
//11.切换非阻塞式
sChannel.configureBlocking(false);
//12.将该通道注册到选择器上
sChannel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
//13.若“读就绪”,获取当前选择器上就绪状态的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
//14.读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while((len = sChannel.read(buf)) > 0){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
//15.取消选择键
it.remove();
}
}
}
}