什么是 NIO?

2021-01-31  本文已影响0人  放开那个BUG

1、前言

想必大家肯定被什么公众号什么非阻塞、零拷贝、NIO 之类的搞的头了吧,但其实 Java 的 NIO 概念主要从 Linux 来的。Linux 实现了 epoll 的概念(更确切的说应该是 I/O 多路复用),主要提供3个 API:

  • epoll_create1:创建 epoll 文件描述符
  • epoll_wait:等待和 epoll 文件描述符关联的 I/O 事件
  • epoll_ctl:设置 epoll 文件描述符的熟悉,更新文件描述符和 epoll 文件描述符的关联

于是 Java 在其基础上封装了一下,它的 NIO 事件驱动网络编程主要由以下3部分组成:

  • Buffer:数据缓冲区,对应 epoll 模型的数组
  • Channel:I/O 操作的数据通道,对应 epoll 模型的 socket 的文件描述符(即对应着 connection 连接)
  • Selector 和 selection key:对应 epoll 模型的 epoll 文件描述符。

Java NIO 的事件驱动网络模型和 epoll 一样,也要在一个 event loop 上面加数据处理的 handler。

2、概念

使用 epoll 实现一个 NIO server 的代码如下,可以与 Java 中的 NIO API 对比一下,便可发现其实真的差不多:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>

#define MAXEVENTS 64
#define BUF_SIZE 1024
char buf[BUF_SIZE];
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

// Run the server "./a.out 5000". Open several consoles and run "nc 127.0.0.1 
// 50000". Type some characters and hit return. Observe the server behaviour.

static void print_error_and_exit(const char* api_name) {
  perror(api_name);
  exit(EXIT_FAILURE);
}

static void make_socket_non_blocking(int sfd) {
  int flags;

  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    print_error_and_exit("fcntl");
  }

  flags |= O_NONBLOCK;
  int ret = fcntl(sfd, F_SETFL, flags);
  if (ret == -1) {
    print_error_and_exit("fcntl");
  }
}

static void add_to_epoll_for_read(int efd, int fd) {
  struct epoll_event event;
  event.data.fd = fd;
  event.events = EPOLLIN;
  int ret = epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event);
  if (ret == -1) {
    print_error_and_exit("epoll_ctl");
  }
}

static int create_and_bind(const char *port) {
  struct addrinfo hints;
  struct addrinfo *res;
  memset(&hints, 0, sizeof(struct addrinfo));
  // IPv4 and IPv6
  hints.ai_family = AF_UNSPEC;          
  // TCP socket
  hints.ai_socktype = SOCK_STREAM;  
  // All interfaces
  hints.ai_flags = AI_PASSIVE;      
  int ret = getaddrinfo(NULL, port, &hints, &res);
  if (ret != 0) {
    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(ret));
    exit(EXIT_FAILURE);
  }

  // Loop over adding and try to bind
  int sfd;
  struct addrinfo *rp;
  for (rp = res; rp != NULL; rp = rp->ai_next) {
    sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
    if (sfd == -1) {
      continue;
    }
    ret = bind(sfd, rp->ai_addr, rp->ai_addrlen);
    if (ret == 0) {
      break;
    }
    close(sfd);
  }

  if (rp == NULL) {
    fprintf(stderr, "could not bind\n");
    exit(EXIT_FAILURE);
  }

  freeaddrinfo(res);
  return sfd;
}

static int serve(const char* port) {
  int sfd = create_and_bind(port);
  make_socket_non_blocking(sfd);
  int ret = listen(sfd, SOMAXCONN);
  if (ret == -1) {
    print_error_and_exit("listen");
  }
  return sfd;
}

// We have data on the fd waiting to be read. Read and display it. We must read
// whatever data is available completely, as we are running in edge-triggered
// mode and won't get a notification again for the same data.
static void read_all(int fd) {
  int done = 0;
  ssize_t count;
  int ret;

  for (;;) {
    count = read(fd, buf, sizeof(buf));
    if (count == -1) {
      // errno == EAGAIN means we have read all data. So ignore it.
      if (errno != EAGAIN) {
        perror("read");
        done = 1;
      }
      break;
    } else if (count == 0) {
      // End of file. The remote has closed the connection.
      done = 1;
      break;
    } 

    buf[count] = 0;
    printf("read %ld bytes: %s\n", count, buf);
  }

  if (done) {
    // Closing the descriptor will make epoll remove it
    // from the set of descriptors which are monitored.
    close(fd);
    printf("file descriptor %d closed\n", fd);
  }
}

static void accept_for_read(int efd, int sfd) {
  struct sockaddr in_addr;
  socklen_t in_len = sizeof(in_addr);
  int infd;
  int ret;

  for (;;) {
    infd = accept(sfd, &in_addr, &in_len);
    if (infd == -1) {
      if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
        // We have processed all incoming connections.
        break;
      } else {
        perror("accept");
        break;
      }
    }
    ret = getnameinfo(&in_addr, in_len, 
        hbuf, sizeof hbuf,
        sbuf, sizeof sbuf,
        NI_NUMERICHOST | NI_NUMERICSERV);
    if (ret == 0) {
      printf("accepted connection on descriptor %d (host=%s, port=%s)\n", infd, hbuf, sbuf);
    }
    make_socket_non_blocking(infd);
    add_to_epoll_for_read(efd, infd);
  }
}

int main(int argc, const char *argv[]) {
  if (argc != 2) {
    fprintf(stderr, "usage: %s [port]\n", argv[0]);
    exit(EXIT_FAILURE);
  }
  int sfd = serve(argv[1]);


  int efd = epoll_create1(0);
  if (efd == -1) {
    print_error_and_exit("epoll_create1");
  }
  add_to_epoll_for_read(efd, sfd);

  struct epoll_event* events = calloc(MAXEVENTS, sizeof(struct epoll_event));

  int nfds;
  int fd;

  for (;;) {
    nfds = epoll_wait(efd, events, MAXEVENTS, -1);
    if (nfds == -1) {
      print_error_and_exit("epoll_wait");
    }
    for (int i = 0; i < nfds; i++) {
      fd = events[i].data.fd;
      if (events[i].events & EPOLLERR) {
        // An error has occurred on this fd, or the socket is not
        // ready for reading (why were we notified then?)
        fprintf(stderr, "epoll error\n");
        close(fd);
        continue;
      } else if (sfd == fd) { // 有 read/write 事件发生
        // We have a notification on the listening socket, which
        // means one or more incoming connections.
        accept_for_read(efd, sfd);
        continue;
      } else { // 有连接进入
        read_all(fd);
      }
    }
  }

  free(events);
  close(sfd);
  return EXIT_SUCCESS;
}
上一篇 下一篇

猜你喜欢

热点阅读