移动 前端 Python Android Java

浅析Linux和JDK对网络通信的实现

2020-12-21  本文已影响0人  zcwfeng
  1. 连接,(客户端,服务器)
  2. 读网络数据
  3. 写网络数据

BIO(Blocking),NIO(IO 多路复用),AIO(异步IO)----Linux并没有完备用NIO实现的异步,Windows 真正实现

面向流 BIO (一个线程服务一个客户端)

面向缓冲区 NIO (一个线程可以服务多个客户端)


nio_buffer.jpeg

Linux网络IO模型

同步和异步,阻塞和非阻塞

Linux下的五种I/O模型:

同步

1)阻塞I/O(blocking I/O)
2)非阻塞I/O (nonblocking I/O)

  1. I/O复用(select 、poll和epoll) (I/O multiplexing)
    4)信号驱动I/O (signal driven I/O (SIGIO))

异步

5)异步I/O (asynchronous I/O )

阻塞I/O(blocking I/O)

2020-12-13 12.30.15.png

进程会一直阻塞,直到数据拷贝完成

IO复用模型

2020-12-13 12.31.41.png

select和epoll;对一个socket,两次调用,两次返回,比阻塞IO并没有什么优越性; 关键是能实现同时对多个socket进行处理。


非阻塞IO模型

2020-12-13 12.34.47.png

非阻塞IO通过进程反复调用IO函数(多次系统调用,并马上返回);在数据拷贝的过程中,进程是阻塞的;

信号驱动IO

2020-12-13 12.35.44.png

套接口进行信号驱动I/O,并安装一个信号处理函数,进程继续运行并不阻塞。 当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用I/O操作函数处理数据。

异步IO模型

2020-12-13 12.37.30.png

当一个异步过程调用发出后,调用者不能立刻得到结果。 实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者的输入输出操作

5个I/O模型的比较:

2020-12-13 12.39.46.png

Linux下的阻塞网络编程

public class Client {

    public static void main(String[] args) throws IOException {
        InetSocketAddress serverAddr =
                new InetSocketAddress("127.0.0.1", 10001);
        Socket socket = new Socket();
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            socket.connect(serverAddr);
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            inputStream = new ObjectInputStream(socket.getInputStream());
            outputStream.writeUTF("top.zcwfeng");
            outputStream.flush();
            System.out.println(inputStream.readUTF());

        } finally {
            if (socket != null) socket.close();
            if (outputStream != null) outputStream.close();
            if (inputStream != null) inputStream.close();
        }
    }

}

单一线程

/**
 * Bio通信的服务端
 */
public class Server {

    public static void main(String[] args) throws IOException {
        //服务端启动必备
        ServerSocket serverSocket = new ServerSocket();
        //表示我们服务器在哪个端口上监听
        serverSocket.bind(new InetSocketAddress(10001));
        System.out.println("Start Server...");
        try {
            while (true){
                new Thread(new ServerTask(serverSocket.accept())).start();
            }
        } finally {
            serverSocket.close();
        }
    }

    public static class ServerTask implements Runnable {
        Socket socket = null;

        public ServerTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            //实例化与客户端通信的输入输出流
            try (ObjectInputStream objectInputStream =
                         new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream =
                         new ObjectOutputStream(socket.getOutputStream())
            ) {
                /*接受客户端的输出,也就是服务器的输入*/
                String userName = objectInputStream.readUTF();
                System.out.println("Accetp client message:"+userName);

                //处理各种实际的业务

                /*服务器的输入的输出,也就是客户端的输入*/
                objectOutputStream.writeUTF("hello," + userName);
                objectOutputStream.flush();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

改为线程池

/**
 * Bio通信的服务端
 */
public class ServerPool {

    private static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    public static void main(String[] args) throws IOException {
        //服务端启动必备
        ServerSocket serverSocket = new ServerSocket();
        //表示我们服务器在哪个端口上监听
        serverSocket.bind(new InetSocketAddress(10001));
        System.out.println("Start Server...");
        try {
            while (true) {
                executorService.execute(new ServerTask(serverSocket.accept()));
            }
        } finally {
            serverSocket.close();
        }
    }


    public static class ServerTask implements Runnable {
        Socket socket = null;

        public ServerTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            //实例化与客户端通信的输入输出流
            try (ObjectInputStream objectInputStream =
                         new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream =
                         new ObjectOutputStream(socket.getOutputStream())
            ) {
                /*接受客户端的输出,也就是服务器的输入*/
                String userName = objectInputStream.readUTF();
                System.out.println("Accetp client message:" + userName);

                //处理各种实际的业务

                /*服务器的输入的输出,也就是客户端的输入*/
                objectOutputStream.writeUTF("hello," + userName);
                objectOutputStream.flush();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

从Linux代码结构看网络通信

2020-12-13 12.42.07.png 2020-12-13 12.42.48.png

Linux网络包接收过程

具体流程

2020-12-13 13.08.40.png

JDK中的BIO实现分析

2020-12-13 13.07.06.png

Linux下的IO复用编程

select
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

epoll
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll高效原理和底层机制分析

网卡接收数据

CPU如何知道接收了数据?

2020-12-13 12.46.48.png

进程阻塞

2020-12-13 12.47.57.png

内核接收网络数据

2020-12-13 12.48.50.png

同时监视多个socket

int fds[] =  存放需要监听的socket
while(1){
  int n = select(...,fds,...)
  for(int i = 0;i < fds.count; i++){
    if(FD_ISSET(fds[i],...){
      //fds[i] 处理数据
    }
  }
}

epoll的设计思路

int epfd = epoll_create(...);
epoll_ctl)epfd,...);
while(1){
  int n =  epoll_wait(...)
  for(接受数据的socket){
  // 处理
  }
}

epoll的原理和流程,epoll的实现细节

int epoll_create(int size);

epfd
2020-12-13 12.51.46.png

JDK中NIO的实现分析

2020-12-13 13.03.43.png

SocketChannel和ServerSocketChannel

2020-12-13 13.04.52.png

Selector

创建
SelectorImpl 
EpollSelectorImpl

SelectionKey 有四种状态,
OP_ACCEPT,OP_CONNECT,OP_READ,OP_WRITE

连接,接收连接,读,写

2020-12-13 13.06.11.png
上一篇下一篇

猜你喜欢

热点阅读