JAVANIO -选择器02 Selector原理和使用
Selector 概述
Selector 一般称为选择器,用来作为SelectableChannel通道的多路复用器。SelectableChannel类型通道可以被注册到多路复用器,通过多路复用器监听感兴趣的事件,这样就可以通过Selector实现单个线程可以管理多个SelectableChannel通道,从而管理多个网络连接。
image监听事件
当SelectableChannel通道注册到Selector多路复用器时需要指定感兴趣的事件。
//socketChannel注册到选择器中监听读取到达事件
socketChannel.register(selector, SelectionKey.OP_READ);
事件类型定义在SelectionKey类的静态常量中,使用二进制位运算得到的整数。
public abstract class SelectionKey {
...省略代码
//一个通道已准备好读取
public static final int OP_READ = 1 << 0;
//一个通道已准备好写入
public static final int OP_WRITE = 1 << 2;
//与远程服务器建立连接。
public static final int OP_CONNECT = 1 << 3;
// ServerSocketChannel接受连接。
public static final int OP_ACCEPT = 1 << 4;
如果SelectableChannel通道对多个选择键感兴趣,可以使用‘|’位运算后在注册到选择器
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
socketChannel.register(selector, interestSet);
不同SelectableChannel子类(通道)支持的SelectionKey(选择键)不同。
可以通过validOps函数获取其支持的监听事件
//ServerSocketChannel 支持的选择键
public final int validOps() {
return SelectionKey.OP_ACCEPT;
}
//socketChannel 支持的选择键
public final int validOps() {
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
SelectionKey
一个Selector中可以注册多通道,不同通道在选择器中被封装成为SelectionKey对象。
public class SelectionKeyImpl extends AbstractSelectionKey {
/** 通道 **/
final SelChImpl channel;
/** 多路复用器**/
public final SelectorImpl selector;
private int index;
/** 感兴趣的事件 **/
private volatile int interestOps;
/** 就绪的事件**/
private int readyOps;
从SelectionKeyImpl定义可以看出相同通道注册多次感兴趣选择键对应到Selector中SelectionKey对象是同一个。
SocketChannel ch1 = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
serverSocketChannel.close();
SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);
SocketChannel ch2 = serverSocketChannel2.accept();
socketChannel.configureBlocking(false);
SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
image
事件管理机制
Seclect内部存在三个集合来管理SelectionKey。监听事件通道集合(publicKeys),通道事件就绪集合(publicSelectedKeys),取消监听通道(cancelKeys)
- 通过SelectableChannel.register方法可以将Channel通道封装成SelectionKey对象添加到Seclect内部publicKeys集合中。
- 通过Seclect.keys方法获取集合publicKeys集合,但无法手动修改。
- 如果某个通道的事件到达,会将通道对应SelectionKey添加到publicSelectedKeys集合中。用户线程观测SelectionKey集合中SelectionKey对已到达的事件作处理。
- 通过selectedKeys方法获取publicSelectedKeys集合,每次通道处理需要手动删除。避免重复处理。
- 如果通道事件到达,用户未处理就将SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。
- 如果通道事件到达,用户处理后会在下一次调用select方法时将通道对应的SelectionKey从publicKeys集合删除
案例
- 通过SelectableChannel.register方法可以将Channel通道封装成SelectionKey对象添加到keysSet集合中。并通过Seclect。keys方法获取集合publicKeys
/**
* 通过register,可以将Channel通道封装成SelectionKey对象添加到keysSet集合中
*/
@Test
public void test_keysSet_add() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
/** 获取publicKeys集合 **/
Set<SelectionKey> selectionKeys_ = selector.keys();
/** 获取注册到选择器中Channel通道的数量**/
System.out.println("selector.keys.size()" + selector.keys());
/** 遍历SelectionKey**/
Iterator<SelectionKey> iterator_ = selectionKeys_.iterator();
while (iterator_.hasNext()) {
SelectionKey key = iterator_.next();
System.out.println(key);
}
}
register1sun.nio.ch.SelectionKeyImpl@7f560810
register2sun.nio.ch.SelectionKeyImpl@69d9c55
selector.keys.size()[sun.nio.ch.SelectionKeyImpl@7f560810, sun.nio.ch.SelectionKeyImpl@69d9c55]
sun.nio.ch.SelectionKeyImpl@7f560810
sun.nio.ch.SelectionKeyImpl@69d9c55
- 同一个通道注册多次事件,其注册到keysSet中SelectionKey是同一个对象
@Test
public void test_keysSet_add2_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
boolean is_Run=true;
while (is_Run) {
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
serverSocketChannel.close();
SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);
// 同一个通道注册多次事件,返回的都是同一个 SelectionKey
System.out.println(register3==register4);
}
}
}
}
@Test
public void test_keysSet_add2_client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
SocketChannel socketChannel2 = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel2.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
socketChannel2.connect(new InetSocketAddress("localhost", 7777));
} catch (Exception e) {
e.printStackTrace();
}
}
true
true
无法手动对publicKeys集合做修改,如修改会抛出UnsupportedOperationException异常
@Test
public void test_keysSet_del1() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
/** 获取publicKeys集合 **/
Set<SelectionKey> selectionKeys_ = selector.keys();
//手动删除会抛出异常
selectionKeys_.remove(register2);
}
- 如果注册在选择器中通道对应SelectionKey被关闭,选择器会在publicKeys集合中标记这个SelectionKey,并在下次调用selector.select()方法后从publicKeys集合中删除
@Test
public void test_keysSet_del3_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
// 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行后将关闭通道SelectionKey从集合中删除
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(15);
Set<SelectionKey> selectionKeys = selector.keys();
System.out.println(selectionKeys);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
boolean is_Run=true;
while (is_Run) {
// 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行前publicKeys集合
Set<SelectionKey> selectionKeys2 = selector.keys();
System.out.println(selectionKeys2);
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel socketChannel = serverSocketChannel.accept();
//如果注册在选择器中通道对应SelectionKey被关闭,选择器会在publicKeys集合中标记这个SelectionKey,并在下次selector.select()方法后删除
key.cancel();
System.out.println("selector.keys pre_del()");
}
}
}
}
@Test
public void test_keysSet_del3_client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
} catch (Exception e) {
e.printStackTrace();
}
}
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
selector.keys pre_del()
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
[sun.nio.ch.SelectionKeyImpl@13a57a3b]
- 如果注册在选择器中通道被关闭时,选择器会在publicKeys集合中标记这个通道对应的SelectionKey,并在下次调用selector.select()方法后从publicKeys集合中删除
@Test
public void test_keysSet_del2_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
/** 用来查看通道被关闭后并调用selector.select()方法后keys集合会将关闭通道SelectionKey从集合中删除 **/
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(15);
Set<SelectionKey> selectionKeys = selector.keys();
System.out.println(selectionKeys);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
boolean is_Run=true;
while (is_Run) {
// 在通道关闭后,select()方法执行前答应下当前keys集合
Set<SelectionKey> selectionKeys2 = selector.keys();
System.out.println(selectionKeys2);
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel socketChannel = serverSocketChannel.accept();
//选择器中通道被关闭后,选择器会在keysSet集合中标记,并在下次selector.select()方法后删除
serverSocketChannel.close();
System.out.println("selector.keys pre_del()");
}
}
}
}
@Test
public void test_keysSet_del2_client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
} catch (Exception e) {
e.printStackTrace();
}
}
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
selector.keys pre_del()
[sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
[sun.nio.ch.SelectionKeyImpl@13a57a3b]
- 在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发生
@Test
public void test_publicSelectedKeys2_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
boolean is_Run=true;
while (is_Run) {
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel==null){
System.out.println("重复连接");
}
InetSocketAddress localAddress = (InetSocketAddress) serverSocketChannel.getLocalAddress();
System.out.println(localAddress.getPort()+"被连接了");
System.out.println("isAcceptable");
}
}
}
}
@Test
public void test_publicSelectedKeys2_client1() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void test_publicSelectedKeys2_client2() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 7777));
} catch (Exception e) {
e.printStackTrace();
}
}
1 注释掉iterator.remove();,顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client2
8888被连接了
isAcceptable
重复连接
8888被连接了
isAcceptable
7777被连接了
isAcceptable
2 打开注释iterator.remove();顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client
8888被连接了
isAcceptable
7777被连接了
isAcceptable
不同的事件处理
OP_READ:
-
如果通道注册了OP_READ事件,当服务端或客户端收到对方数据请求时,会将通道对应SelectionKey添加到publicSelectedKeys集合,用户线程遍历需SelectedKeys集合获取通道,调用read方法读取数据。读取数据完毕,就可以将SelectionKey从publicSelectedKeys集合中删除。
-
如果不处理直接将通道对应SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。
@Test
public void test_publicSelectedKeys_server2() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
boolean is_Run=true;
while (is_Run) {
/** 阻塞等待事件到达**/
System.out.println("selector.selectedKeys:"+selectionKeys);
selector.select();
System.out.println("selector.selectedKeys:"+selectionKeys);
/** 获取到达事件SelectionKey集合**/
selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println("isAcceptable");
}else if(key.isReadable()){
SocketChannel socketChannel = (SocketChannel)key.channel();
//注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
//ByteBuffer allocate = ByteBuffer.allocate(50);
//socketChannel.read(allocate);
//System.out.println(new String(allocate.array()));
}
}
}
}
@Test
public void test_publicSelectedKeys_client2() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
OP_WRITE
如果通道注册了OP_READ事件,选择器会自动将通道对应SelectionKey添加到publicSelectedKeys集合,,用户线程遍历需SelectedKeys集合获取通道,调用write方法发送数据。发送数据完毕,就可以将SelectionKey从publicSelectedKeys集合中删除。不同下次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中,也就是无法停止,这时我们只能通过设置SelectionKey.interestOps()重新设置事件,将OP_WRITE事件去掉,那么下次下次调用select方法时就不会重新添加到publicSelectedKeys集合中。
@Test
public void test_publicSelectedKeys_client3() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 7777));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
boolean is_Run = true;
while (is_Run) {
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
socketChannel = (SocketChannel) key.channel();
while (!socketChannel.finishConnect()){
}
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_READ(当客户端请求数据时事件到达被添加到selectedKeys集合中) **/
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if (key.isReadable()) {
socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int read = socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
}
}
}
OP_ACCEPT
-
如果通道注册了OP_ACCEPT事件,当客户端收向服务端请求连接时,会将通道对应SelectionKey添加到publicSelectedKeys集合,用户线程遍历需SelectedKeys集合获取通道,调用serverSocketChannel.accept()处理后就可以将SelectionKey从publicSelectedKeys集合中删除。
-
如果不处理直接将通道对应SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。
@Test
public void test_publicSelectedKeys_server1() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
boolean is_Run=true;
while (is_Run) {
/** 阻塞等待事件到达**/
System.out.println("selector.selectedKeys:"+selectionKeys);
selector.select();
System.out.println("selector.selectedKeys:"+selectionKeys);
/** 获取到达事件SelectionKey集合**/
selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
//注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
//serverSocketChannel.accept();
System.out.println("isAcceptable");
}
}
}
}
@Test
public void test_publicSelectedKeys_client1() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 8888));
} catch (Exception e) {
e.printStackTrace();
}
}
select
select()方法提供给用户线程判断是否存在就绪通道,如果不存在就绪通道则会阻塞当前线程,直到某个通道的事件到达。
处理流程
image需要注意如下特性
-
线程因为select()导致的阻塞可以被interrupt()中断而释放
-
线程因为select()导致的阻塞可以被close()中断而释放
-
线程因为select()导致的阻塞可以被wakeup()中断而释放
案例
- select()阻塞被interrupt()唤醒
/**
* interrupt()中断函数可以用来释放所有selector.select()阻塞
*/
@Test
public void test_select() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
Thread currentThread = Thread.currentThread();
/** 开启一个线程5s后调用close()方法关闭选择器**/
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
currentThread.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/** 阻塞等待事件到达**/
//可以被中断
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel accept = socketChannel.accept();
accept.close();
}
}
serverSocketChannel.close();
System.out.println("over");
}
//over
- select()阻塞可以被close()中断
@Test
public void test_select3() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
Thread currentThread = Thread.currentThread();
/** 开启一个线程5s后调用close()方法关闭选择器**/
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/** 阻塞等待事件到达**/
//可以被中断
selector.select();
serverSocketChannel.close();
System.out.println("over");
}
- select()阻塞被wakeup()唤醒
@Test
public void test_select1() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
Thread currentThread = Thread.currentThread();
/** 开启一个线程5s后调用wakeup()方法**/
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
selector.wakeup();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/** 阻塞等待事件到达**/
//可以被中断
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel accept = socketChannel.accept();
accept.close();
}
}
serverSocketChannel.close();
System.out.println("over");
}
- selector.select()返回的是publicSelectedKeys变化值,如一个通道监听了连接OP_ACCEPT事件并阻塞,当客户端连接事件到达,线程从阻塞中唤醒返回1,表明有一个通道添加到了publicSelectedKeys集合中。如果事件被处理后并没有从publicSelectedKeys集合中删除,客户端在次发起连接事件到达,线程在次从阻塞中唤醒返回值0,因为当前通道已经存在与publicSelectedKeys集合中
@Test
public void test_select4_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel2.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel3 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel3.bind(new InetSocketAddress("localhost", 6666));
/** 设置为非阻塞IO模型 **/
serverSocketChannel3.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
SelectionKey register3 = serverSocketChannel3.register(selector, SelectionKey.OP_ACCEPT);
Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
boolean is_Run=true;
while (is_Run) {
/** 阻塞等待事件到达**/
int updatekey = selector.select();
/** 获取到达事件SelectionKey集合**/
selectionKeys = selector.selectedKeys();
System.out.println("updatekey:"+updatekey);
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
//iterator.remove();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
}
}
}
}
@Test
public void test_select4_client1() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 6666));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void test_select4_client2() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
SocketChannel socketChannel1 = SocketChannel.open();
try {
socketChannel.configureBlocking(true);
socketChannel1.configureBlocking(true);
socketChannel.connect(new InetSocketAddress("localhost", 7777));
socketChannel1.connect(new InetSocketAddress("localhost", 8888));
} catch (Exception e) {
e.printStackTrace();
}
}
close
/**
* 调用close()关闭选择器会导致
*
* 1 释放所有selector.select()阻塞
*
* 2 除了再次调用close()和wakeup()方法外,调用selector()的其他方法均出现异常。
*
* 3 如果选择器已经关闭则在次调用此方法将不起作用
*/
@Test
public void test_close() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 开启一个线程5s后调用close()方法关闭选择器**/
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模型 **/
serverSocketChannel.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
//此出抛出异常
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()){
SelectionKey key = iterator.next();
/** 判断是否是OP_ACCEPT事件**/
if(key.isAcceptable()){
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
/** 获取SocketChannel**/
SocketChannel accept = socketChannel.accept();
accept.close();
}
}
serverSocketChannel.close();
}