Netty挖掘机

Netty挖掘机(一)初识Java Nio

2019-11-08  本文已影响0人  进击的阿黑

推荐阅读:美团技术团队对NIO的浅析

众所周知,Netty是基于JAVA NIO 而封装的网络通讯框架。

官网介绍:Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

本人打着想重新梳理一遍Netty的知识体系的心态,来写Netty挖掘机这一系列的文章。我相信一步步深入到源码后,会有收获的!

什么是阻塞和非阻塞、同步和异步IO

一个IO操作分为先发起IO请求、再进行实际的IO操作

​ 同步需要等待(线程阻塞时)或轮询(线程非阻塞时)内核完成IO操作

​ 异步是系统等内核完成IO操作后(被调用者)再主动通知程序(通知 或者 回调)

为什么说NIO是同步非阻塞?

NIO它的实际内核I/O操作(读read、写write、接受accept)会阻塞请求线程,所以是同步的;但是它的发起请求这一步,不会导致线程阻塞,而会通过多路复用器select进行轮询获取实际内核I/O操作完成的信息

什么是BIO?

即面向流的同步阻塞IO, Client、Server基于输入流、输出流进行通信

单线程BIO

bio_single.png

多线程版BIO

典型的C/S模型。它是由Server开启accept线程进行监听Client的连接请求,一个请求对应创建一个新的线程进行处理,并通过输出流返回给客户端。

​ 优点:支持同一时间内多个客户端并行请求服务端

​ 缺点:不具备弹性伸缩能力。当面对海量连接时,意味着线程数膨胀,与其同时造成的是系统性能的急剧下降(因为线程也是宝贵的系统资源),进而会发生句柄溢出、线程堆栈溢出,甚至造成服务器宕机。

bio1.png

​ 代码示例:

public void start(int port) throws IOException {
    ServerSocket server = new ServerSocket(port);
    System.out.println("# Server start,the port:" + port);
    
    System.out.println("### start listener client connect...");
    Socket client = server.accept();
    System.out.println("### now accept one client connect, start handler...");
    new Thread(() -> {
        handler(client);
    }).start();
}
private void handler(Socket client) {
    try {
        InputStream stream = client.getInputStream();
        while (true) {
            byte[] bytes = new byte[1024];
            int len = stream.read(bytes);
            ...
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

线程池版BIO

在多线程的基础上,增加了线程池的概念,可以避免线程频繁创建。

线程池本身就是一个天然的漏斗,可以在特殊情况下解决一些系统无法处理的问题

然而还是逃不开一个事实:把线程当做命根子。。。

bio_pool.png
public void start(int port) {
    // 创建容量为100的线程池
    ExecutorService executor = Executors.newFixedThreadPool(100);
    try {
        ServerSocket server = new ServerSocket(port);
        System.out.println("# Server start,the port:" + port);
        while (! Thread.currentThread().isInterrupted()) {// 循环等待新连接
            Socket client = server.accept();
            executor.submit(new Thread(() -> {
                handler(client);
            }));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
private void handler(Socket client) {
    try {
        InputStream stream = client.getInputStream();
        while (true) {
            byte[] bytes = new byte[1024];
            int len = stream.read(bytes);
            ...
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

什么是NIO

即面向缓冲区的同步非阻塞IO。

从jdk1.4开始,增加了与原标准IO API 不同使用方式的NIO(NO BLOCK IO)

jdk1.5_update10 版本使用 epoll 替代了传统的 select/poll,极大的提升了 NIO 通信的性能。

传输方式

​ 在BIO中使用字节流or字符流来传输数据;而在NIO中使用通道和缓冲区来传输数据

​ Channel将数据读入Buffer, Buffer将数据写入Channel

channelbuffer.png

核心模块

Channel

即数据通道,不同于单向io(读和写需要不同的通道),这里的通道支持双向,即可从通道内读写数据。

实际Channel与Buffer结合使用:从通道读数据到缓冲区,缓冲区向通道写入数据

JAVA NIO Channel 主要有以下几种 channel.jpg

以下是使用FileChannel 读取文件的Demo

public static void main(String[] args) {
    StringBuffer json = null;
    try(FileInputStream fis = new FileInputStream("test.json");
        FileChannel fc = fis.getChannel()) {
        json = new StringBuffer();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        int br = fc.read(buf);// 读入缓冲区
        while (br != -1) {
            buf.flip();// 缓冲区读取就绪
            while(buf.hasRemaining()) {
                json.append((char)buf.get());
            }
            buf.clear();
            br = fc.read(buf);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    System.out.println(json.toString());
}

Buffer

即缓冲区。本质上是一块 可以从中读取数据,或写入数据 的内存,在JAVA NIO中提供了具体的方法来访问内存中的数据

Channel将数据读入Buffer, Buffer将数据写入Channel

channelbuffer.png

Channel读入Buffer:

FileChannel fc = fis.getChannel()
ByteBuffer buf = ByteBuffer.allocate(1024);
int bread = fc.read(buf);

Buffer写入Channel:

// 需要有写入权限,否则会抛异常
// RandomAccessFile fc = new RandomAccessFile("test.json", "rw")
FileChannel fc = fis.getChannel()
ByteBuffer buf = ByteBuffer.allocate(1024);
int bread = fc.write(buf);
主要属性
主要方法

Selector(多路复用)

即选择器

在java nio中,可以通过Selector用单个线程来管理多个通道。它能够检测n个Nio通道的状态(连接就绪、接收就绪、读就绪、写就绪)
现代操作系统的多任务处理上,开销貌似变得越来越小,但程序上仍要控制使用线程的频率,因为线程的切换开销是很昂贵的

selector.jpg
事件模型

我们说NIO 本身是基于事件驱动思想来处理IO的。这里的事件即我们通过Selector注册对应的事件在Channel上。它的事件主要有

连接就绪 SelectionKey.OP_ACCEPT

读就绪 SelectionKey.OP_READ

写就绪 SelectionKey.OP_WRITE

以下文字是摘自美团技术团队 《Java Nio浅析》中的一段

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。

流程

完整代码如下:

Selector selector;

public static void main(String[] args) throws IOException {
  new NioServer(8080).listener();
}

public NioServer(int port) {
try {
    // 打开通道
    ServerSocketChannel server = ServerSocketChannel.open();
    // 绑定端口
    server.bind(new InetSocketAddress(port));
    // 设置阻塞模式,false:nio style.true:oio style.
    server.configureBlocking(false);
    // 创建选择器
    selector = Selector.open();
    // 使用给定的选择器注册此通道
    server.register(selector, SelectionKey.OP_CONNECT);

    System.out.println("服务端已启动,端口" + port);
} catch (IOException e) {
  e.printStackTrace();
}
}

public void listener() throws IOException {
    for(;;) {
        // 阻塞,获取已就绪的key个数
        int wait = selector.select();

        if(wait == 0) continue;
        // 获取所有的事件key
        Set<SelectionKey> readykeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = readykeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            // 移除已处理数据
            iterator.remove();
            // 处理逻辑
            process(key);
        }
  }
}

/**
* 业务逻辑方法
* @param key
*/
public void process(SelectionKey key) throws IOException {
    // 判断客户端是否已确认连接并且可交互
    if(key.isAcceptable()) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        // 设置阻塞模式
        client.configureBlocking(false);// 非阻塞
        // 注册选择器,设置读模式,告诉client端下次进来便要读取数据
        client.register(selector, SelectionKey.OP_READ, ByteBuffer.wrap("HI!\r\n".getBytes("utf-8")));
        //            // 将此对应的channel设置为准备接受其他客户端的连接请求
        //            key.interestOps(SelectionKey.OP_ACCEPT);
        System.out.println("接受连接来自 "+client);
    }
    // 处理数据读取请求
    if(key.isReadable()) {
        // 取数据
        // 返回该读key 对应的channel
        SocketChannel client = (SocketChannel) key.channel();
        // 获取channel内的数据
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder content = new StringBuilder();
        try {
            while (true) {
                if (!(client.read(buffer) > 0)) break;
                // 翻转缓冲区
                buffer.flip();
                content.append(buffer.toString());
            }
            // 将channel设置为准备下一次读取
            key.interestOps(SelectionKey.OP_READ);
        } catch (IOException e) {
            key.cancel();
            if(key.channel() != null) key.channel().close();
        }
    }
}
```java
Charset charset = Charset.forName("UTF-8");
try(FileInputStream fis = new FileInputStream("test.json");
    FileChannel fc = fis.getChannel()) {
    ByteBuffer headBuf = ByteBuffer.allocate(8);// 头部缓冲区固定大小8字节
    long size = fc.size();
    ByteBuffer bodyBuf = ByteBuffer.allocate((int) size);// 剩下的归为body

    ByteBuffer[] byteBuffers = new ByteBuffer[]{headBuf, bodyBuf};
    fc.read(byteBuffers);

    headBuf.flip();
    System.out.println("head buffer data " + charset.decode(headBuf));

    bodyBuf.flip();
    System.out.println("body buffer data " + charset.decode(bodyBuf));
} catch (IOException e) {
    e.printStackTrace();
}
```

分散/聚集 IO支持,针对的是通道和buffer间的交互

通道可以将数据分散读入多个缓冲区,多个缓冲区可以将数据聚集写入单个通道

说明 特性
连接输出 在内存中写入非顺序放置数据的应用程序可以在一个向量I / O操作中执行此操作。
效率 一个向量I / O读取或写入可以替换许多普通读取或写入,从而节省系统调用所涉及的开销
拆分输入 需要单独处理传输数据的各个部分的情况。例如,如果消息由标题和正文组成,则可以将标题和正文保留在单独的缓冲区中。这样做可以使您更容易分别使用标题和正文

总结

为什么NIO会替代BIO?

​ 可以从两个角度出发

上一篇 下一篇

猜你喜欢

热点阅读