Netty并发(个人收集)

想要彻底理解Netty,你需要先搞明白NIO

2019-11-23  本文已影响0人  iDevOps

I/O模型

I/O模型简单理解就是用什么样的通道进行数据的发生和接收,这个很大程度上觉得了程序通信的性能。
Java有三种网络编程模型,分别是BIO、NIO和NIO。

BIO
同步并阻塞,一个连接一个线程,即客户端有请求服务端就会开启一个线程进行处理。缺点就是如果这个连接不做任何事情就会造成不必要的线程开销。


BIO示意图

NIO
同步非阻塞,一个线程处理多个请求,客户端请求会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。


NIO示意图

AIO
异步非阻塞IO,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动程序去处理,一般适合用于连接数较多且连接时间较长的应用。不常用,所以这里不做过多讲解。

三种IO模型的应用场景

BIO适合于链接数目较少且固定的架构,这种方式对服务器的资源要求较高,JDK1.4之前的唯一选择,程序简单易理解。

NIO适合于连接数目较多且连接比较短的架构,比如聊天服务器、服务期间通信等,编程较复杂。JDK1.4新增。

AIO适用于连接数多且连接时间较长的架构。编程较复杂,JDK1.7开始支持。

BIO

Java BIO就是传统的Java io编程,相关类和接口都在java.io包下。前面已经说过,它是同步阻塞的,一个连接就启动一个线程,比较浪费服务器资源。

需求: 使用BIO实现一个服务器端,监听10000端口,当客户端连接时,就启动一个线程与之通讯,要求使用线程池。

public class BioServer {

    public static void main(String[] args) throws Exception {
        //1.创建一个线程池
        ExecutorService pool = Executors.newCachedThreadPool();
        //2.如果有请求, 就创建一个线程与之通信
        //创建服务端
        ServerSocket serverSocket = new ServerSocket(10000);
        System.out.println("服务器启动.");
        while (true){
            System.out.println("等待连接......");
            final Socket socket = serverSocket.accept();
            System.out.println("有新连接");
            //创建一个线程与之通信
            pool.execute(new Runnable() {
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    /**
     * 处理客户端请求
     * @param socket
     */
    public static void handler(Socket socket){
        System.out.println("线程id: "+ Thread.currentThread().getId());
        System.out.println("线程name: "+ Thread.currentThread().getName());
        try {
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();
            while (true){
                int read = inputStream.read(bytes);
                if(read != -1){
                    //输出客户端发送的数据
                    System.out.println(new String(bytes, 0, read));
                }else{
                    break;
                }
            }

        }catch (Exception e){

        }finally {
            try {
                System.out.println("关闭与客户端的连接");
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

NIO

Java non-blocking IO的缩写,同步非阻塞,JDK1.4开始提供。
相关类和接口放在java.nio包下。

NIO三大核心概念

Channel(通道)、Buffer(缓冲区)、Selector(选择器)

NIO是面向缓冲区的编程

请求数据会先读取到一个缓冲区,需要时可以在缓冲区中前后移动,这样增加了处理过程的灵活性。

NIO是非阻塞的

请求要写入一些数据到某个通道,不需要等待写完,这个线程就可以去干别的事去。

NIO vs BIO

1.BIO以流的方式处理数据,NIO以块的方式处理数据,块I/O的效率比流I/O的效率高很多
2.BIO是阻塞的,NIO是非阻塞的
3.BIO基于字节流和字符流进行操作,NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区或从缓冲区写入到通道,Selector(选择器)用于监听多个通道的事件,比如连接请求、数据到达等,因此使用单个线程就可以监听多个客户端通道。

Channel、Buffer和Selector的关系

1.一个线程对应一个Selector
2.一个Channel对应一个Buffer
3.Selector会根据不同的事件,在各个Channel上切换
4.Buffer就是一个内存块,底层是一个数组
5.数据的读取写入都是通过Buffer,Channel是双向的。


Channel、Buffer和Selector的关系图
缓冲区(Buffer)

缓冲区的本质其实就是一个可以读写数据的内存块,可以理解成一个容器对象,该对象提供了一系列方法,可以轻松的使用内存块。

通道(Channel)

通道可以同时进行读写,流只能读或只能写,通道可以实现异步读取数据,通道可以从缓冲读取数据,也可以写数据到缓冲。
Channel是一个接口,常用的实现类有FileChannel、DatagramChannel、ServerSocketChannel、SocketChannel等

用于本地文件的数据读写

常用方法
read : 从通道读取数据并放入缓冲区
write : 把缓冲区的数据写入到通道
transferFrom : 从目标通道复制数据到当前通道
transferTo : 把数据从当前通道复制给目标通道

需求: 使用ByteBuffer和FileChannel将"Hello Nio"写入到a.txt文件中

public static void main(String[] args) throws Exception{

    //1.创建输出流
    String str = "Hello Nio";
    FileOutputStream fileOutputStream = new FileOutputStream("d:\\a.txt");

    //2.获取对应的FileChannel
    FileChannel channel = fileOutputStream.getChannel();

    //3.创建一个缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    byteBuffer.put(str.getBytes());
    byteBuffer.flip();

    //4.将缓冲区的数据写入到FileChannel
    channel.write(byteBuffer);
    fileOutputStream.close();

}

需求:使用ByteBuffer和FileChannel,将a.txt文件中的输出到控制台

public static void main(String[] args) throws  Exception{
    File file = new File("d:\\a.txt");
    FileInputStream fis = new FileInputStream(file);
    FileChannel channel = fis.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
    channel.read(buffer);
    System.out.println(new String(buffer.array()));
    fis.close();
}

需求: 使用FileChannel和方法write和read完成文件拷贝

public static void main(String[] args) throws Exception{
    FileInputStream fileInputStream = new FileInputStream("d:\\a.txt");
    FileOutputStream fileOutputStream = new FileOutputStream("d:\\b.txt");
    FileChannel channel1 = fileInputStream.getChannel();
    FileChannel channel2 = fileOutputStream.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(512);
    while (true){
        //清空buffer
        buffer.clear();
        int read = channel1.read(buffer);
        if(read == -1){
            break;
        }
        //将buffer中的数据写入到channel2
        buffer.flip();
        channel2.write(buffer);
    }
    fileInputStream.close();
    fileOutputStream.close();
}

需求: 使用FileChannel和方法transferFrom完成文件拷贝

public static void main(String[] args) throws Exception {
    FileInputStream fileInputStream = new FileInputStream("d:\\a.txt");
    FileOutputStream fileOutputStream = new FileOutputStream("d:\\b.txt");
    FileChannel channel1 = fileInputStream.getChannel();
    FileChannel channel2 = fileOutputStream.getChannel();
    channel2.transferFrom(channel1, 0, channel1.size());
    channel1.close();
    channel2.close();
    fileInputStream.close();
    fileOutputStream.close();
}

用于UDP的数据读写

用于TCP数据的读写

Selector(选择器)

一个线程处理多个客户单连接,就会使用Selector,Selector能检测多个注册的通道上是否有事件发生,如果事件发送,便获取事件然后对事件进行相应的处理。并且还可以避免多线程之间上下文切换导致的开销。

open() : 得到一个选择器对象
select() : 一直阻塞,监控所有注册的通道,当其中有IO操作,将对应的SelectionKey加入到内部集合并返回
select(1000) : 阻塞1000毫秒
selectedKeys() : 从内部集合中得到所以的SelectionKey
wakeup() : 唤醒selector
selectNow() : 不阻塞,立马返回

注: 这些方法的使用后面都会有具体的案例讲解。

Nio入门案例

需求:实现服务端和客户端之间的数据通信(非阻塞)

在案例开始先说几个概念

在服务端监听新的客户端Socket连接

方法
open() : 得到一个ServerSocketChannel通道
configureBlocking(boolean block) : 设置阻塞和非阻塞模式,false为非阻塞
accept() : 接受一个连接,返回代表这个连接的通道对象
register() : 注册一个选择器并设置监听事件

网络IO通道,负责读写操作

方法
open() : 获取一个SocketChannel通道
configureBlocking(boolean block) : 设置阻塞和非阻塞模式,false为非阻塞
connect(SocketAddress remote) : 连接服务器
finishConnect() : 如果连接失败,使用该方法继续完成连接
write(ByteBuffer src) : 往通道里写数据
read(ByteBuffer dst) : 从通道里读数据
register(Selector sel, int ops, Object att) : 注册一个选择器并指定监听事件,最后一个参数设置共享数据
close() : 关闭通道

表示Selector和网络通道的注册关系,共有4种:
OP_ACCEPT : 有新的连接可以accept
OP_CONNECT : 代表连接已建立
OP_READ : 读操作
OP_WRITE : 写操作

方法
selector() : 得到与之关联的Selector对象
channel() : 得到与之关联的通道
attachment() : 得到与之关联的Buffer
isAcceptable() : 是否可以accept
isReadable() : 是否可以读
isWritable() : 是否可以写

public class NioServer {

    public static void main(String[] args) throws Exception{
        //1.创建
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //2.创建Selector
        Selector selector = Selector.open();

        //3.绑定端口并监听端口
        serverSocketChannel.socket().bind(new InetSocketAddress(10000));
        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        //4.把serverSocketChannel注册到selector上, 并监听OP_ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //5.循环等待客户端连接
        while (true){
            //等待1秒
            int select = selector.select(1000);
            //等于0, 表示没有事件发送, 就返回
            if (select == 0){
                System.out.println("服务器等待了1秒, 无连接");
                continue;
            }
            //返回>0, 说明获取到事件, 获取所有事件的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //遍历集合
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()){
                //获取SelectionKey, 并根据SelectionKey对应的通道所发生的事件类型做对应处理
                SelectionKey key = keyIterator.next();
                //OP_ACCEPT, 有新的客户端连接了
                if(key.isAcceptable()){
                    //为该客户端生成一个SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("有新客户端: "+socketChannel.hashCode());
                    //将SocketChannel设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将SocketChannel注册到selector, 监听时间为OP_READ, 同时给SocketChannel关联一个Buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                //OP_READ, 有新消息
                if(key.isReadable()){
                    //根据key获取对应的channel
                    SocketChannel channel = (SocketChannel)key.channel();
                    //根据key获取对应的buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    channel.read(buffer);
                    System.out.println("客户单消息: "+ new String(buffer.array()));
                }
                //手动从集合中移除当前的selectionKey, 防止重复操作
                keyIterator.remove();
            }
        }
    }

}
public class NioClient {

    public static void main(String[] args) throws Exception {

        //1.创建一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //2.连接服务器
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 10000);
        if(!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("正在连接服务器中....");
            }
        }
        //3.连接成功, 发送数据
        String str = "Hello NioServer";
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(buffer);
        System.in.read();

    }

}

就整理这么多吧

上一篇下一篇

猜你喜欢

热点阅读