4 Channel
1 Channel简介
通道(Channel)可以理解为数据传输的管道。通道与流不同的是,流只是在一个方向上移动(一个流必须是inputStream或者outputStream的子类),而通道可以用于读、写或者同时用于读写。
channel 类的继承关系
为了保证尽可能清晰的显示我们关注的点,图中只显示了我们关心的Channel。
I/O 可以分为广义的两大类别: File I/O 和 Stream I/O。
那么相应地有两种类型的通道也就不足为怪了,它们是文件( file)通道和套接字( socket)通道。仔细看一下上图,你会发现有一个 FileChannel 类和三个 socket 通道类: SocketChannel、 ServerSocketChannel 和 DatagramChannel。
通道可以是单向( unidirectional)或者双向的( bidirectional)。一个 channel 类可能实现定义read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以提供 write( )方法。
public interface ReadableByteChannel extends Channel {
public int read(ByteBuffer dst) throws IOException;
}
public interface WritableByteChannel extends Channel{
public int write(ByteBuffer src) throws IOException;
}
实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。
可以看到read和write方法接受的都是一个ByteBuffer参数,其中read方法,就是往ByteBuffer中put数据,write方法就是将ByteBuffer中的数据get出来,以便发送给其他远程主机。
两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的position位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining( )方法返回 false 值。
在上面的类图中,我们可以看到FileChannel、SocketChannel通道都实现了这两个接口。从类定义的角度而言,这意味着FileChannel、SocketChannel 通道对象都是双向的。这对于 SocketChannel 不是问题,因为它们一直都是双向的,不过对于FileChannel 却是个问题了。
我们知道,一个文件可以在不同的时候以不同的权限打开。从 FileInputStream 对象的getChannel( )方法获取的 FileChannel 对象是只读的,不过从接口声明的角度来看却是双向的,因为FileChannel 实现 ByteChannel 接口。在这样一个通道上调用 write( )方法将抛出未经检查的NonWritableChannelException 异常,因为 FileInputStream 对象总是以 read-only 的权限打开文件。
通道可以以多种方式创建。 Socket 通道有可以直接创建新 socket 通道的工厂方法。但是一个FileChannel 对象却只能通过在一个打开的 RandomAccessFile、 FileInputStream 或 FileOutputStream对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。
SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));
ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));
DatagramChannel dc = DatagramChannel.open( );
RandomAccessFile raf = new RandomAccessFile ("somefile", "r");
FileChannel fc = raf.getChannel( );
java.net 的 socket 类也有新的 getChannel( )方法。这些方法虽然能返回一个相应的 socket 通道对象,但它们却并非新通道的来源,RandomAccessFile.getChannel( )方法才是。只有在已经有通道存在的时候,它们才返回与一个 socket 关联的通道;它们永远不会创建新通道。
2 Socket通道详解
在通道类中,DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel不实现。 ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传输数据。
全部 NIO中的socket 通道类( DatagramChannel、 SocketChannel 和 ServerSocketChannel)在被实例化时都会创建一个对等的BIO中的 socket 对象( Socket、 ServerSocket和 DatagramSocket)。
DatagramChannel、 SocketChannel 和 ServerSocketChannel通道类都定义了socket()方法,我们可以通过这个方法获取其关联的socket对象。另外每个Socket、 ServerSocket和 DatagramSocket都定义了getChannel()方法,来获取对应的通道。
需要注意是,只有通过通道类创建的socket对象,其getChannel方法才能返回对应的通道,如果直接new了socket对象,那么其getChannel方法返回的永远是null。
非阻塞模式
通道可以以阻塞( blocking)或非阻塞( nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。
这个陈述虽然简单却有着深远的含义。传统 Java socket的阻塞性质曾经是 Java 程序可伸缩性的最重要制约之一。非阻塞 I/O 是许多复杂的、高性能的程序构建的基础。
回顾我们之前讲解的BIO编程中,不能"以尽可能少的线程,处理尽可能多的client请求"
,就是因为通过Socket的getInputStream方法的read方法是阻塞的,一旦没有数据可读,处理线程就会被一直被block住。
默认情况下,一个通道创建,总是阻塞的,我们可以通过调用configureBlocking(boolean)方法即可,传递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。而 isBlocking()方法来判断某个 socket 通道当前处于哪种模式
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
...
if ( ! sc.isBlocking( )) {
doSomething (cs);
}
偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。 API 中有一个 blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式,对于确保在执行代码的关键部分时 socket 通道的阻塞模式不会改变
以及在不影响其他线程的前提下暂时改变阻塞模式来说,这个方法都是非常方便的。
Socket socket = null;
Object lockObj = serverChannel.blockingLock( );
// 执行关键代码部分的时候,使用这个锁进行同步
synchronize (lockObj)
{
// 一旦进入这个部分,锁就被获取到了,其他线程不能改变这个channel的阻塞模式
boolean prevState = serverChannel.isBlocking( );
serverChannel.configureBlocking (false);
socket = serverChannel.accept( );
serverChannel.configureBlocking (prevState);
}
// 释放锁,此时其他线程可以修改channel的阻塞模式
if (socket != null) {
doSomethingWithTheSocket (socket);
}
2.1 ServerSocketChannel
让我们从最简单的 ServerSocketChannel 来开始对 socket 通道类的讨论。
ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟悉的 java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。
用静态的 open( )工厂方法创建一个新的 ServerSocketChannel 对象,将会返回同一个未绑定的java.net.ServerSocket 关联的通道。该对等 ServerSocket 可以通过在返回的 ServerSocketChannel 上调用 socket( )方法来获取。作为 ServerSocketChannel 的对等体被创建的 ServerSocket 对象依赖通道实现。这些 socket 关联的 SocketImpl 能识别通道。通道不能被封装在随意的 socket 对象外面。
由于 ServerSocketChannel 没有 bind( )方法,因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。
ServerSocketChannel ssc = ServerSocketChannel.open( );
ServerSocket serverSocket = ssc.socket( );
// Listen on port 1234
serverSocket.bind (new InetSocketAddress (1234));
同它的对等体 java.net.ServerSocket 一样, ServerSocketChannel 也有 accept( )方法。一旦您创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。
如果以非阻塞模式被调用,当没有传入连接在等待时, ServerSocketChannel.accept( )会立即返回 null。
正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个 ServerSocketChannel 对象以实现新连接到达时自动通知的功能。下面的代码演示了如何使用一个非阻塞的 accept( )方法:
public class ChannelAccept {
public static final String GREETING = "Hello I must be going.\r\n";
public static void main(String[] argv)
throws Exception {
int port = 1234; // default
if (argv.length > 0) {
port = Integer.parseInt(argv[0]);
}
ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(port));
ssc.configureBlocking(false);
while (true) {
System.out.println("Waiting for connections");
SocketChannel sc = ssc.accept();
if (sc == null) {
// no connections, snooze a while
Thread.sleep(2000);
} else {
sc.configureBlocking(false);
ByteBuffer allocate = ByteBuffer.allocateDirect (16 * 1024);
while(sc.read(allocate)>0){
allocate.flip();
while (buffer.hasRemaining( )) {
byte b = buffer.get();
System.out.println(b);
}
allocate.clear();
}
System.out.println("Incoming connection from: "
+ sc.socket().getRemoteSocketAddress());
buffer.rewind();
sc.write(buffer);
sc.close();
}
}
}
}
这段程序的作用是,在1234端口上接受client的请求,一旦接收到client的请求,会给其回复固定的字符串响应"Hello I must be going."
运行这段程序,可以看到控制台打印出
Waiting for connections
Waiting for connections
Waiting for connections
Waiting for connections
......
说明程序的确运行在非阻塞模式下,因为否则就会想ServerSocket.accpet方法那样,一直阻塞下去。
现在通过命令行执行
telnet localhost 1234
可以看到得到一个响应之后,连接立马关闭
image.png
2.2 SocketChannel
下面开始学习 SocketChannel,它是使用最多的 socket 通道类:
Socket 和 SocketChannel 类封装点对点、有序的网络连接,就是我们所熟知并喜爱的 TCP/IP网络连接。 SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收。
每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。
新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket 通道被连接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个SocketChannel 当前是否已连接。
下面两段代码是等价的
通过open方法
SocketChannel socketChannel =
SocketChannel.open (new InetSocketAddress ("somehost", somePort));
通过connect方法
SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));
如果您选择使用传统方式进行连接——通过在对等 Socket 对象上调用 connect( )方法,那么传统的连接语义将适用于此。线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用 connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么连接过程实际上是一样的。
在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时( timeout)值,当 connect( )方法在非阻塞模式下被调用时 SocketChannel 提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是 true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立, connect( )方法会返回 false 且并发地继续连接建立过程。
面向流的的 socket 建立连接状态需要一定的时间,因为两个待连接系统之间必须进行包对话以建立维护流 socket 所需的状态信息。跨越开放互联网连接到远程系统会特别耗时。假如某个SocketChannel 上当前正由一个并发连接, isConnectPending( )方法就会返回 true 值。
调用 finishConnect( )方法来完成连接过程,该方法任何时候都可以安全地进行调用。假如在一个非阻塞模式的 SocketChannel 对象上调用 finishConnect( )方法,将可能出现下列情形之一:
-
connect( )方法尚未被调用。那么将产生 NoConnectionPendingException 异常。
-
连接建立过程正在进行,尚未完成。那么什么都不会发生, finishConnect( )方法会立即返回false 值。
-
在非阻塞模式下调用 connect( )方法之后, SocketChannel 又被切换回了阻塞模式。那么如果有必要的话,调用线程会阻塞直到连接建立完成, finishConnect( )方法接着就会返回 true值。
-
在初次调用 connect( )或最后一次调用 finishConnect( )之后,连接建立过程已经完成。那么SocketChannel 对象的内部状态将被更新到已连接状态, finishConnect( )方法会返回 true值,然后 SocketChannel 对象就可以被用来传输数据了。
-
连接已经建立。那么什么都不会发生, finishConnect( )方法会返回 true 值。
当通道处于中间的连接等待( connection-pending)状态时,您只可以调用 finishConnect( )、isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成, isConnected( )将返回 true值。
InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );
3 Channel案例
在这个案例中,我们使用nio中的channel+线程池,来实现TimeServer、TimeClient
Server端
public class TimeServer {
private BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>();
private BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>();
private ExecutorService executor = Executors.newSingleThreadExecutor();
{
new Thread(){
@Override
public void run() {
try {
while (true) {
//task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中
for (int i = 0; i < idleQueue.size(); i++) {
SocketChannel socketChannel = idleQueue.poll();
if (socketChannel != null) {
Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel);
workingQueue.put(result);
}
}
//task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中
for (int i = 0; i < workingQueue.size(); i++) {
Future<SocketChannel> future = workingQueue.poll();
if (!future.isDone()){
workingQueue.put(future);
continue;
}
SocketChannel channel = null;
try {
channel = future.get();
idleQueue.put(channel);
} catch (ExecutionException e) {
//如果future.get()抛出异常,关闭SocketChannel,不再放回idleQueue
channel.close();
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
public static void main(String[] args) throws IOException, InterruptedException {
TimeServer timeServer = new TimeServer();
ServerSocketChannel ssc=ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(8080));
while (true){
SocketChannel socketChannel = ssc.accept();
if(socketChannel==null){
continue;
}else{
socketChannel.configureBlocking(false);
timeServer.idleQueue.add(socketChannel);
}
}
}
}
TimeServerHandleTask
public class TimeServerHandleTask implements Runnable {
SocketChannel socketChannel;
public TimeServerHandleTask(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length());
//尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。
int bytesRead = socketChannel.read(requestBuffer);
//如果没有读取到数据,说明当前SocketChannel并没有发送请求,不需要处理
if (bytesRead <= 0) {
return;
}
//如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME",
while (requestBuffer.hasRemaining()) {
socketChannel.read(requestBuffer);
}
String requestStr = new String(requestBuffer.array());
if (!"GET CURRENT TIME".equals(requestStr)) {
String bad_request = "BAD_REQUEST";
ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length());
responseBuffer.put(bad_request.getBytes());
responseBuffer.flip();
socketChannel.write(responseBuffer);
} else {
String timeStr = Calendar.getInstance().getTime().toLocaleString();
ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length());
responseBuffer.put(timeStr.getBytes());
responseBuffer.flip();
socketChannel.write(responseBuffer);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
TimeServer中维护了两个队列,idleQueue 和workingQueue。
工作步骤如下所示:
- 在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue。
- 在static静态代码块中,我们创建了一个Thread。其作用是不断的循环idleQueue和workingQueue。
首先循环idleQueue,迭代出其中的SocketChannel,然后封装成一个TimeServerHandleTask对象,提交到线程池中处理这个SocketChannel的请求,同时我们会将SocketChannel中移除,放到workingQueue中。需要注意的是,这个SocketChannel可能只是与服务端建立了连接,但是没有发送请求,又或者是发送了一次或者多次请求。发送一次"GET CURRENT TIME”,就相当于一次请求。在TimeServerHandleTask中,会判断是否发送了请求,如果没有请求则不需要处理。如果SocketChannel发送了多次请求,TimeServerHandleTask一次也只会处理一个请求。其他的请求等到下一次循环的时候再处理。因为使用线程池的情况,线程的数量有限,所以要合理的分配,不能让一个线程一直处理一个client的请求。
接着是迭代workingQueue,通过future.isDone()判断当前请求是否处理完成,如果处理完成,将其从workingQueue中移除,重新加入idleQueue中。
client端
public class TimeClient {
//连接超时时间
static int connectTimeOut=3000;
static ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
socketChannel.configureBlocking(false);
long start=System.currentTimeMillis();
while (!socketChannel.finishConnect()){
if (System.currentTimeMillis()-start>=connectTimeOut){
throw new RuntimeException("尝试建立连接超过3秒");
}
}
//如果走到这一步,说明连接建立成功
while (true){
buffer.put("GET CURRENT TIME".getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
if(socketChannel.read(buffer)>0){
buffer.flip();
byte[] response=new byte[buffer.remaining()];
buffer.get(response);
System.out.println("reveive response:"+new String(response));
buffer.clear();
}
Thread.sleep(5000);
}
}
}
先运行server端,再运行client端,可以在client端看到类似以下的输出
reveive response:2016-12-18 21:52:09
reveive response:2016-12-18 21:52:14
reveive response:2016-12-18 21:52:19
reveive response:2016-12-18 21:52:24
reveive response:2016-12-18 21:52:29
...
到这里,我们好像已经可以达到我们的目标"以尽可能少的线程,处理尽可能多的client请求"
,但是现实总是残酷的,这个案例中代码的效率太低了。
因为我们并不知道一个SocketChannel是否发送了请求,所以必须迭代所有的SocketChannel,然后尝试读取请求数据,如果有请求,就处理,否则就跳过。假设一个有10000个连接,前9999个连接都没有请求,刚好最后一个连接才有请求。那么前9999次任务处理都是没有必要的。
如果有一种方式,可以让我们直接获取到真正发送了请求的SocketChannel,那么效率将会高的多。
这就是我们下一节将要讲解的Selector(选择器),其可以帮助我们管理所有与server端已经建立了连接的client(SocketChannel),并将准备好数据的client过滤出来。我们可以有一个专门的线程来运行Selector,将准备好数据的client交给工作线程来处理。