Java AIO基础
Java AIO(异步IO)特性是在Java7引入的。
[TOC]
同步异步、阻塞非阻塞的理解
同步和异步
同步和异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询(主动同步)内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
阻塞和非阻塞
阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。
同步和异步描述的是调用方和被调用方之间的状态的关系,同步情况下,调用方和被调用方的状态是需要同步的,即被调用方返回结果后,调用方才会产生结果,异步的情况即二者的返回状态不是同步的,调用方调用了被调用方之后,无需后续步骤(调用之后,忘记这个调用)。
阻塞和非阻塞是调用者自身的状态,阻塞是无响应直到完成,非阻塞则是不管是否执行完成都立即返回。
所以,阻塞非阻塞是站在进程的角度上来审视进程的执行形态的,这个状态描述和同步、异步不是互斥也不是相关的,他们描述的是不同层次的形态。比如A进程向B进程发起一个请求,以HTTP请求为例,如果A发出请求后依赖B返回的响应(依赖这个结果,否则无法继续),那么A请求B这个过程就是同步的,也许B处理这个请求的方式可能是阻塞也可能是非阻塞的,但是都不影响我们描述A->B这个请求过程是同步还是异步,决定元素是调用的模型。
如果A通过ajax方式向B发送请求,A无需等待B返回结果,可以直接做后面的事情,也不需要再关心响应到来时去接受和处理,这一切都在调用之前的回调函数里面定下来了,A的任务就是发出一个请求,然后就不管了。这是典型的异步模型。在A->B这个过程中,A不会等待B返回结果,甚至连看一眼都没有看(A并不需要B返回的结果),所以异步必然是非阻塞的。
还有就是如果A使用子线程去发起阻塞请求或者轮询非阻塞请求那么是不是异步请求,也不是,使用子线程只能把阻塞改成非阻塞,A依然依赖B的响应结果,AB之间依然需要“同步”,依然是同步的过程。
所以可以简单地说,同步异步描述的是两个拥有调用关系操作之间的关系,相关(需要状态同步)还是相互独立(调用之后就忘记,不需要同步);阻塞和非阻塞式针对一个操作的描述,需要挂起直到操作结束返回就是阻塞,立即返回结果就是非阻塞。
异步IO
在前端编程中,有ajax的异步请求的编程方式,主要逻辑是发起一个http请求,配置成功和失败的处理逻辑,当请求完成时(成功或者失败),会自行调用配置好的处理函数进行处理。从请求调用方来看,除了在调用之前配置好处理逻辑,在调用后完全没有继续的交互。这是异步编程的主要特征:调用它,然后忘掉它。
一般来说,异步编程都是基于回调的,回调函数简单来说就是预定义的方法,在发生特定情况执行特定的方法。在java编程中典型的类就是xxxListener的模式。
在Java异步IO中,对于Java进程来说,它需要的IO操作就是注册完成后的逻辑处理和发出异步请求,具体的事情由操作系统(底层)去完成,操作系统完成了操作后会通知给java进程,进而执行预定义的回调函数。
AIO和同步IO(BIO和NIO)不同在于,IO操作全部委托给了被调用者(操作系统),在阻塞和非阻塞IO中,不管是使用阻塞流还是使用select选择器,用户进程下一步操作都是依赖操作系统的IO操作结果的,也就是需要同步的。而在AIO中,也就是前面通俗说的,(先写好回调操作)调用系统的IO操作,然后忘记它。
在java中,支持异步模型的方式有两个类:
- Future类
- Callable接口
严格来说,Future不能算是异步模型的类,因为future.get()方法是阻塞的,需要等待处理完成;而Callable是回调,是正宗的异步模型工具。
Java 7异步IO
主要类
java7在NIO中增加了四个异步通道,以支持Java NIO使用异步IO的功能:
- AsynchronousFileChannel: 用于文件异步读写;
- AsynchronousSocketChannel: 客户端异步socket;
- AsynchronousServerSocketChannel: 服务器异步socket。
- AsynchronousDatagramChannel: UDP异步Socket(jdk 7 b118已删除)
异步通道 API 提供两种对已启动异步操作的监测与控制机制。
-
第一种是通过返回一个 java.util.concurrent.Future 对象来实现,它将会建模一个挂起操作,并可用于查询其状态以及获取结果。因为future.get()是阻塞方法,所以使用此模型本质上属于同步过程。
-
第二种是通过传递给操作一个新类的对象,java.nio.channels.CompletionHandler,来完成,它会定义在操作完毕后所执行的处理程序方法。每个异步通道类为每个操作定义 API 副本,这样可采用任一机制。
API
AsynchronousServerSocketChannel
An asynchronous channel for stream-oriented listening sockets.
AsynchronousServerSocketChannel是ServerSocketChannel的异步版本的通道,支持异步处理。
主要使用逻辑和ServerSocket及ServerSocketChannel一样:创建通道,绑定地址,等待连接。
- 打开(创建)通道
public static AsynchronousServerSocketChannel open();
public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group);
- bind
将ServerSocket绑定到指定端口,或者绑定到自动分配的端口;
public final AsynchronousServerSocketChannel bind(SocketAddress local);
public abstract AsynchronousServerSocketChannel bind(SocketAddress local, int backlog);
- 接收连接
// 回调形式
public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler);
// Future形式
public abstract Future<AsynchronousSocketChannel> accept();
无论是哪种方式来获取连接,最终的处理对象都是Socket,和ServerSocketChannel不同的是,这里的socket是封装在AsynchronousSocketChannel
中的。
- 示例
基于Future
@Test
public void AsynchronousServerSocketChannel() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
while (true) {
Future<AsynchronousSocketChannel> conn = channel.accept();
// 阻塞等待直到future有结果
AsynchronousSocketChannel asyncSocketChannel = conn.get();
// 异步处理连接
asyncHandle(asyncSocketChannel);
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
基于回调
@Test
public void AsynchronousServerSocketChannelCallback() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
System.out.println("accept completed");
// 异步处理连接
asyncHandle(result);
// 继续监听accept
channel.accept(null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("accept failed");
}
});
// 让主线程保持存活
while (true) {
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
AsynchronousChannelGroup异步通道组
每个异步通道都属于一个通道组,它们共享一个 Java 线程池,该线程池用于完成启动的异步 I/O 操作。这看上去有点像欺骗,因为您可在自己的 Java 线程中执行大多数异步功能,来获得相同的表现,并且,您可能希望能够仅仅利用操作系统的异步 I/O 能力,来执行 NIO.2 ,从而获得更优的性能。然而,在有些情况下,有必要使用 Java 线程:比如,保证 completion-handler 方法在来自线程池的线程上执行。
默认情况下,具有 open() 方法的通道属于一个全局通道组,可利用如下系统变量对其进行配置:
-
java.nio.channels.DefaultThreadPoolthreadFactory
,其不采用默认设置,而是定义一个 java.util.concurrent.ThreadFactory -
java.nio.channels.DefaultThreadPool.initialSize
,指定线程池的初始规模
java.nio.channels.AsynchronousChannelGroup 中的三个实用方法提供了创建新通道组的方法:
withCachedThreadPool()
withFixedThreadPool()
withThreadPool()
这些方法或者对线程池进行定义,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下调用创建了具有线程池的新的通道组,该线程池包含 10 个线程,其中每个都构造为来自 Executors 类的线程工厂:
AsynchronousChannelGroup tenThreadGroup =
AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
三个异步网络通道都具有 open() 方法的替代版本,它们采用给出的通道组而不是默认通道组。例如,当有异步操作请求时,此调用告诉 channel 使用 tenThreadGroup 而不是默认通道组来获取线程:
AsynchronousServerSocketChannel channel =
AsynchronousServerSocketChannel.open(tenThreadGroup);
定义自己的通道组可更好地控制服务于操作的线程,并能提供关闭线程或者等待终止的机制。
AsynchronousSocketChannel
和NIO通道是SocketChannel功能相似。
- 打开通道
- 从accept()返回获取已连接的异步SocketChannel
- 使用静态open()方法打开,和AsynchronousServerSocketChannel的open方法一样
- 连接
connect方法:
// 基于回调
public abstract <A> void connect(SocketAddress remote,
A attachment,
CompletionHandler<Void,? super A> handler);
// 基于Future
public abstract Future<Void> connect(SocketAddress remote);
- 读写操作,有多个重载的Future和回调式的read和write方法
public abstract <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
public abstract Future<Integer> read(ByteBuffer dst);
public abstract <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
// write
public abstract <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler);
public abstract Future<Integer> write(ByteBuffer src);
public abstract <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
- 示例
服务器端示例,使用的是accept返回的channel。
// 基于future 实际上是同步的读取方式
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
ByteBuffer dst = ByteBuffer.allocate(1024);
// based on Future,
// 实际上是同步处理的方式,为了不将处理变成阻塞式单连接的socket形式,使用子线程来获取输入流
new Thread(() -> {
while (asyncSocketChannel.isOpen()) {
Future<Integer> readFuture = asyncSocketChannel.read(dst);
try {
// 阻塞等待读取结果
Integer readResult = readFuture.get();
if (readResult > 0) {
System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
} else {
// doOtherthing
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}).start();
}
// 基于回调
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result > 0) {
System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
}
// 注册回调,继续读取输入
asyncSocketChannel.read(dst, null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
// TODO Auto-generated method stub
}
});
}
客户端连接示例
// Future
@Test
public void testasyncSocketChannel() {
AsynchronousSocketChannel asc;
try {
asc = AsynchronousSocketChannel.open();
Future<Void> connResult =
asc.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
// 等待连接成功。连接成功会返回null;
connResult.get();
// 读写操作
ByteBuffer buf = ByteBuffer.wrap("hello".getBytes());
asc.write(buf).get();
// TODO
asc.close();
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// callback
@Test
public void testasyncSocketChannelCallback() {
CountDownLatch latch = new CountDownLatch(1);
AsynchronousSocketChannel asc;
try {
asc = AsynchronousSocketChannel.open();
asc.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888), null,
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
System.out.println("connect complete;");
// 写操作
ByteBuffer buf = ByteBuffer.wrap("hellocallback".getBytes());
asc.write(buf, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("write completed,close channnl");
try {
latch.countDown();
asc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
try {
asc.close();
latch.countDown();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
// TODO Auto-generated method stub
}
});
// 等待回调结束
latch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
AsynchronousFileChannel
@Test
public void testAsynchronousFileChannel() {
Path file = Paths.get("X:\\logs\\debug.log.2018-06-28");
System.out.println("file size:" + file.toFile().length());
// 用于控制主进程不再读文件之前消亡
CountDownLatch latch = new CountDownLatch(1);
try {
// 打开异步文件通道
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(file, StandardOpenOption.READ,
StandardOpenOption.WRITE);
// 异步文件加锁,注意要使用写文件属性才能开启锁
channel.lock(null, new CompletionHandler<FileLock, Void>() {
@Override
public void completed(FileLock result, Void attachment) {
// 锁定成功
ByteBuffer dst = ByteBuffer.allocateDirect((int) file.toFile().length());
// 异步读文件
channel.read(dst, 0, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
try {
System.out.println("read completed");
System.out.println("buffer size:" + dst.limit());
dst.flip();
int bufsize = 64;
byte[] buf = new byte[bufsize];
while (dst.hasRemaining()) {
int getSize = Math.min(dst.remaining(), bufsize);
dst.get(buf, 0, getSize);
// 判断读到文件末尾的buf长度
System.out.print(new String(
getSize == bufsize ? buf
:
Arrays.copyOf(buf, Math.min(dst.remaining(), bufsize)),
StandardCharsets.UTF_8));
}
} finally {
latch.countDown();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
latch.countDown();
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
latch.countDown();
}
});
latch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
参考资料
[1] 在 Java 7 中体会 NIO.2 异步执行的快乐
[2] 异步通道 API