Java IO, NIO, AIO和Netty

2019-11-24  本文已影响0人  bertrand319

背景

最近在回顾一下Java IO相关的知识,顺带写一下入门级别的文章。感觉工作以后很少写文章,一直想写点高质量的文章导致最后一篇文章都很难写。所以不写原理,只写实践,随大流,有问题请留言。(后续有时间再补充原理性的东西,从硬件到操作系统到JVM到JDK)

实现案例

创建一个server,可以接受多个client端的连接,接收到信息后返回一个接收到的信息。

传统IO实现

传统的IO就是我们所说的BIO(block io),

server端源码如下

package tech.sohocoder.postman.io;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class Server {

    private ServerSocket serverSocket;


    private void start() throws IOException, ClassNotFoundException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
        serverSocket = new ServerSocket();
        serverSocket.bind(inetSocketAddress);
        ExecutorService executorService = Executors.newCachedThreadPool(new CaughtExceptionsThreadFactory());
        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("accept socket: " + socket.getRemoteSocketAddress());
            executorService.submit(new SocketHandler(socket));
        }
    }

    private static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override public void uncaughtException(Thread t, Throwable e) {
            e.printStackTrace();
        }
    }

    private class SocketHandler implements Runnable {

        private Socket socket;

        public SocketHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    String message = ois.readObject().toString();
                    System.out.println("Message Received: " + message);
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    //write object to Socket
                    oos.writeObject("Hi Client " + message);
                    if (message.equals("quit")) {
                        ois.close();
                        oos.close();
                        socket.close();
                        break;
                    }
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Server server = new Server();
        server.start();
    }
}

client端源码如下

package tech.sohocoder.postman.io;

import java.io.*;
import java.net.Socket;

public class Client {

    private Socket socket;

    public void start() throws IOException, ClassNotFoundException {
        socket = new Socket("localhost", 9000);
        if(socket.isConnected()) {
            System.out.println("socket is connected");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(line);
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                System.out.println("Message: " + ois.readObject());
                if(line.equals("quit")) {
                    oos.close();
                    ois.close();
                    socket.close();
                    break;
                }
            }
        }
        System.out.println("Bye");
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Client client = new Client();
        client.start();
    }
}

NIO的阻塞实现

NIO实际上就是面向缓存及通道的新型IO(由JSR 51定义,后面JSR 203进行了扩展,有兴趣阅读一下这两个JSR)可以支持阻塞和非阻塞方式。先实现一下阻塞方式

client

package tech.sohocoder.nio.block;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        tech.sohocoder.nio.noblock.Client client = new tech.sohocoder.nio.noblock.Client();
        client.start();
    }
}

server

package tech.sohocoder.nio.block;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private void start() throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        while (true) {
            System.out.println("listening...");
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int readLength = socketChannel.read(byteBuffer);
            if(readLength != -1) {
                String receiveStr = new String(byteBuffer.array()).trim();
                System.out.println(receiveStr);
                socketChannel.write(byteBuffer);
            }
            socketChannel.close();
        }
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.start();
    }
}

NIO的非阻塞方式

NIO如果需要非阻塞,需要使用到selector。selector是在JDK1.4加入,主要是用于支持IO多路复用,Linux下jdk实现就是基于epoll。

client端代码保存一致。

server端实际上就是使用一个线程来支持多个连接

package tech.sohocoder.nio.noblock;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import static java.lang.System.out;

public class Server {

    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    private void start() throws IOException, InterruptedException {
        serverSocketChannel = ServerSocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress(9000);
        serverSocketChannel.bind(socketAddress);

        serverSocketChannel.configureBlocking(false);

        int opSelectionKey = serverSocketChannel.validOps();

        selector = Selector.open();

        SelectionKey selectionKey = serverSocketChannel.register(selector, opSelectionKey);

        out.println(selector);
        out.println(selectionKey);
        while(true) {
            out.println("waiting for connected...");
            selector.select();
            Set<SelectionKey> set  = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey mySelectionKey = iterator.next();
                if(mySelectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey selectionKey1 = socketChannel.register(selector, SelectionKey.OP_READ);
                    out.println("socket channel selectionkey: " + selectionKey1);
                    out.println("connect from : " + socketChannel.getRemoteAddress());
                }else if(mySelectionKey.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) mySelectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer);
                    String message = new String(byteBuffer.array()).trim();
                    out.println("Receive message: " + message);
                    if(message.equals("quit")) {
                        out.println("close connection: " + socketChannel.getRemoteAddress());
                        socketChannel.close();
                        mySelectionKey.cancel();
                    }else {
                        ByteBuffer returnByteBuffer = ByteBuffer.wrap(" receive your message".getBytes());
                        socketChannel.write(returnByteBuffer);
                    }
                }
                iterator.remove();
            }
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

AIO实现

上面的IO,NIO的阻塞实际上是同步阻塞的方式,NIO的非阻塞是同步非阻塞方式。AIO(asynchronous I/O))是异步IO,实现是异步非阻塞方式,在jdk1.7中引入。

server端源码如下:

package tech.sohocoder.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.lang.System.out;

public class Server {

    private AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    private void start() throws IOException, InterruptedException {
        // worker thread pool
        AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 4);
        asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
        int port = 9000;
        InetSocketAddress socketAddress = new InetSocketAddress("localhost", port);
        asynchronousServerSocketChannel.bind(socketAddress);

        out.println("Starting listening on port " + port);
        // add handler
        asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object o) {

                try {
                    out.println("connect from : " + asynchronousSocketChannel.getRemoteAddress());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                // accept next connection
                asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, this);
                while (true) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    Future<Integer> future = asynchronousSocketChannel.read(byteBuffer);
                    try {
                        future.get();
                        String message = new String(byteBuffer.array()).trim();
                        out.println("Receive message: " + message);
                        if (message.equals("quit")) {
                            out.println("close client: " + asynchronousSocketChannel.getRemoteAddress());
                            asynchronousSocketChannel.close();
                            break;
                        }

                        ByteBuffer returnByteBuffer = ByteBuffer.wrap("receive your message".getBytes());
                        Future<Integer> returnFuture = asynchronousSocketChannel.write(returnByteBuffer);
                        returnFuture.get();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void failed(Throwable throwable, Object o) {
                out.println("error to accept: " + throwable.getMessage());
            }
        });
        asynchronousChannelGroup.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = new Server();
        server.start();
    }
}

Netty实现

Netty是java中使用很广泛的库,既可以实现NIO也可以实现AIO,还是针对上面的例子来实现一下

server端

package tech.sohocoder.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import static java.lang.System.out;

public class Server {

    private void start() throws InterruptedException {
        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // add handler into pipeline
                            socketChannel.pipeline()
                                    .addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new ServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
            out.println("listening...");
            channelFuture.channel().closeFuture().sync();
        }finally {
           bossEventLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Server server = new Server();
        server.start();
    }

}

这里面需要使用到ServerHandler,具体代码如下

package tech.sohocoder.netty;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.net.SocketAddress;

import static java.lang.System.out;

public class ServerHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        out.println("Receive message: " + msg);
        String message = "receive your message";
        ctx.writeAndFlush(message);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        out.println("connect from: " + ctx.channel().remoteAddress().toString());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        out.println("close connection: " + ctx.channel().remoteAddress().toString());
        super.channelInactive(ctx);
    }

}

client端也用netty写一下

package tech.sohocoder.aio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import static java.lang.System.out;

public class Client {

    private SocketChannel socketChannel;

    public void start() throws IOException {
        socketChannel = SocketChannel.open();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
        socketChannel.connect(socketAddress);
        if(socketChannel.isConnected()) {
            out.println("connect to " + socketChannel.getRemoteAddress());
        }

        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            final String input = in.readLine();
            final String line = input != null ? input.trim() : null;
            if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                continue;
            }

            ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
            socketChannel.write(byteBuffer);

            if(line.equals("quit")) {
                out.println("quit!");
                socketChannel.close();
                break;
            }

            ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(returnByteBuffer);
            String message = new String(returnByteBuffer.array()).trim();
            out.println("Receive message: " + message);
        }
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}

同样要实现一个ClientHandler

package tech.sohocoder.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import static java.lang.System.out;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        out.println("Receive message: " + s);
    }
}

上一篇下一篇

猜你喜欢

热点阅读