Java IO, NIO, AIO和Netty
2019-11-24 本文已影响0人
bertrand319
背景
最近在回顾一下Java IO相关的知识,顺带写一下入门级别的文章。感觉工作以后很少写文章,一直想写点高质量的文章导致最后一篇文章都很难写。所以不写原理,只写实践,随大流,有问题请留言。(后续有时间再补充原理性的东西,从硬件到操作系统到JVM到JDK)
实现案例
创建一个server,可以接受多个client端的连接,接收到信息后返回一个接收到的信息。
传统IO实现
传统的IO就是我们所说的BIO(block io),
server端源码如下
package tech.sohocoder.postman.io;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class Server {
private ServerSocket serverSocket;
private void start() throws IOException, ClassNotFoundException {
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
serverSocket = new ServerSocket();
serverSocket.bind(inetSocketAddress);
ExecutorService executorService = Executors.newCachedThreadPool(new CaughtExceptionsThreadFactory());
while (true) {
Socket socket = serverSocket.accept();
System.out.println("accept socket: " + socket.getRemoteSocketAddress());
executorService.submit(new SocketHandler(socket));
}
}
private static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
}
}
private class SocketHandler implements Runnable {
private Socket socket;
public SocketHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
while (true) {
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
String message = ois.readObject().toString();
System.out.println("Message Received: " + message);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
//write object to Socket
oos.writeObject("Hi Client " + message);
if (message.equals("quit")) {
ois.close();
oos.close();
socket.close();
break;
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException {
Server server = new Server();
server.start();
}
}
client端源码如下
package tech.sohocoder.postman.io;
import java.io.*;
import java.net.Socket;
public class Client {
private Socket socket;
public void start() throws IOException, ClassNotFoundException {
socket = new Socket("localhost", 9000);
if(socket.isConnected()) {
System.out.println("socket is connected");
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
final String input = in.readLine();
final String line = input != null ? input.trim() : null;
if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
continue;
}
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(line);
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
System.out.println("Message: " + ois.readObject());
if(line.equals("quit")) {
oos.close();
ois.close();
socket.close();
break;
}
}
}
System.out.println("Bye");
}
public static void main(String[] args) throws IOException, ClassNotFoundException {
Client client = new Client();
client.start();
}
}
NIO的阻塞实现
NIO实际上就是面向缓存及通道的新型IO(由JSR 51定义,后面JSR 203进行了扩展,有兴趣阅读一下这两个JSR)可以支持阻塞和非阻塞方式。先实现一下阻塞方式
client
package tech.sohocoder.nio.block;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import static java.lang.System.out;
public class Client {
private SocketChannel socketChannel;
public void start() throws IOException {
socketChannel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
socketChannel.connect(socketAddress);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
final String input = in.readLine();
final String line = input != null ? input.trim() : null;
if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
continue;
}
ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
socketChannel.write(byteBuffer);
if(line.equals("quit")) {
out.println("quit!");
socketChannel.close();
break;
}
ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(returnByteBuffer);
String message = new String(returnByteBuffer.array()).trim();
out.println("Receive message: " + message);
}
}
public static void main(String[] args) throws IOException {
tech.sohocoder.nio.noblock.Client client = new tech.sohocoder.nio.noblock.Client();
client.start();
}
}
server
package tech.sohocoder.nio.block;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Server {
private ServerSocketChannel serverSocketChannel;
private void start() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress(9000);
serverSocketChannel.bind(socketAddress);
while (true) {
System.out.println("listening...");
SocketChannel socketChannel = serverSocketChannel.accept();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readLength = socketChannel.read(byteBuffer);
if(readLength != -1) {
String receiveStr = new String(byteBuffer.array()).trim();
System.out.println(receiveStr);
socketChannel.write(byteBuffer);
}
socketChannel.close();
}
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.start();
}
}
NIO的非阻塞方式
NIO如果需要非阻塞,需要使用到selector。selector是在JDK1.4加入,主要是用于支持IO多路复用,Linux下jdk实现就是基于epoll。
client端代码保存一致。
server端实际上就是使用一个线程来支持多个连接
package tech.sohocoder.nio.noblock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import static java.lang.System.out;
public class Server {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private void start() throws IOException, InterruptedException {
serverSocketChannel = ServerSocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress(9000);
serverSocketChannel.bind(socketAddress);
serverSocketChannel.configureBlocking(false);
int opSelectionKey = serverSocketChannel.validOps();
selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, opSelectionKey);
out.println(selector);
out.println(selectionKey);
while(true) {
out.println("waiting for connected...");
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey mySelectionKey = iterator.next();
if(mySelectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey selectionKey1 = socketChannel.register(selector, SelectionKey.OP_READ);
out.println("socket channel selectionkey: " + selectionKey1);
out.println("connect from : " + socketChannel.getRemoteAddress());
}else if(mySelectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) mySelectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array()).trim();
out.println("Receive message: " + message);
if(message.equals("quit")) {
out.println("close connection: " + socketChannel.getRemoteAddress());
socketChannel.close();
mySelectionKey.cancel();
}else {
ByteBuffer returnByteBuffer = ByteBuffer.wrap(" receive your message".getBytes());
socketChannel.write(returnByteBuffer);
}
}
iterator.remove();
}
}
}
public static void main(String[] args) throws IOException, InterruptedException {
Server server = new Server();
server.start();
}
}
AIO实现
上面的IO,NIO的阻塞实际上是同步阻塞的方式,NIO的非阻塞是同步非阻塞方式。AIO(asynchronous I/O))是异步IO,实现是异步非阻塞方式,在jdk1.7中引入。
server端源码如下:
package tech.sohocoder.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static java.lang.System.out;
public class Server {
private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
private void start() throws IOException, InterruptedException {
// worker thread pool
AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 4);
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
int port = 9000;
InetSocketAddress socketAddress = new InetSocketAddress("localhost", port);
asynchronousServerSocketChannel.bind(socketAddress);
out.println("Starting listening on port " + port);
// add handler
asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object o) {
try {
out.println("connect from : " + asynchronousSocketChannel.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
// accept next connection
asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, this);
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Future<Integer> future = asynchronousSocketChannel.read(byteBuffer);
try {
future.get();
String message = new String(byteBuffer.array()).trim();
out.println("Receive message: " + message);
if (message.equals("quit")) {
out.println("close client: " + asynchronousSocketChannel.getRemoteAddress());
asynchronousSocketChannel.close();
break;
}
ByteBuffer returnByteBuffer = ByteBuffer.wrap("receive your message".getBytes());
Future<Integer> returnFuture = asynchronousSocketChannel.write(returnByteBuffer);
returnFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable throwable, Object o) {
out.println("error to accept: " + throwable.getMessage());
}
});
asynchronousChannelGroup.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
}
public static void main(String[] args) throws IOException, InterruptedException {
Server server = new Server();
server.start();
}
}
Netty实现
Netty是java中使用很广泛的库,既可以实现NIO也可以实现AIO,还是针对上面的例子来实现一下
server端
package tech.sohocoder.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import static java.lang.System.out;
public class Server {
private void start() throws InterruptedException {
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// add handler into pipeline
socketChannel.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
out.println("listening...");
channelFuture.channel().closeFuture().sync();
}finally {
bossEventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
Server server = new Server();
server.start();
}
}
这里面需要使用到ServerHandler,具体代码如下
package tech.sohocoder.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
import static java.lang.System.out;
public class ServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
out.println("Receive message: " + msg);
String message = "receive your message";
ctx.writeAndFlush(message);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
out.println("connect from: " + ctx.channel().remoteAddress().toString());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.println("close connection: " + ctx.channel().remoteAddress().toString());
super.channelInactive(ctx);
}
}
client端也用netty写一下
package tech.sohocoder.aio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import static java.lang.System.out;
public class Client {
private SocketChannel socketChannel;
public void start() throws IOException {
socketChannel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
socketChannel.connect(socketAddress);
if(socketChannel.isConnected()) {
out.println("connect to " + socketChannel.getRemoteAddress());
}
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
final String input = in.readLine();
final String line = input != null ? input.trim() : null;
if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
continue;
}
ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
socketChannel.write(byteBuffer);
if(line.equals("quit")) {
out.println("quit!");
socketChannel.close();
break;
}
ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(returnByteBuffer);
String message = new String(returnByteBuffer.array()).trim();
out.println("Receive message: " + message);
}
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.start();
}
}
同样要实现一个ClientHandler
package tech.sohocoder.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import static java.lang.System.out;
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
out.println("Receive message: " + s);
}
}