Node.js介绍5-libuv的基本概念

2016-05-14  本文已影响2510人  转角遇见一直熊

Node.js介绍1-事件驱动非阻塞中,我们了解到node中除了v8还有一个底层依赖,那就是libuv。可以看看下图。

node.js核心

现在我们就看看libuv是干嘛的,了解libuv对我们了解node很有必要。

libuv提供了什么

libuv enforces an asynchronous, event-driven style of programming. Its core job is to provide an event loop and
callback based notifications of I/O and other activities. libuv offers core utilities like timers, non-blocking networking support, asynchronous file system access, child processes and more.
libuv使用异步,事件驱动的编程方式,核心是提供i/o的事件循环和异步回调。libuv的API包含有时间,非阻塞的网络,异步文件操作,子进程等等。

为什么需要libuv

当程序在等待i/o完成的时候,我们希望cpu不要被这个等待中的程序阻塞,libuv提供的编程方式使我们开发异步程序变得简单。

while there are still events to process:
    e = get the next event
    if there is a callback associated with e:
        call the callback

上面伪代码的事件(event)可以是:
• 文件已经准备好写入
• 套接字已经接受到数据,可以读了
• 一个计时器已经到期

libuv基础

看一个简单的libuv程序.idle这里翻译成空转监视器。

#include <stdio.h>
#include <uv.h>

int64_t counter = 0;

void wait_for_a_while(uv_idle_t* handle, int status) {
    counter++;

    if (counter >= 10e6)
        uv_idle_stop(handle);
}

int main() {
    uv_idle_t idler;

    uv_idle_init(uv_default_loop(), &idler);
    uv_idle_start(&idler, wait_for_a_while);

    printf("Idling...\n");
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    return 0;
}
代码 含义
uv_idle_t 空转监视器
uv_default_loop 默认循环,node也使用这个循环哦
uv_idle_init 初始化
uv_idle_start 设置事件回调函数并监听相应事件
wait_for_a_while 回调函数
uv_idle_stop 停止监听
uv_run 事件循环由 uv_run函数封装, 在使用 libuv 编程时, 该函数通常在最后才被调用.由于设置了监视器, 所以调用 uv_run()时程序会阻塞, 空转监视器将会在计数器达到设定的值时停止(监视), uv_run()会退出。因为此时程序中没有活动的监视器了.

上面uv_XXX_t是命名规范,我们看看还有哪些这种结构:

typedef struct uv_loop_s uv_loop_t;
typedef struct uv_err_s uv_err_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_tty_s uv_tty_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_process_s uv_process_t;
typedef struct uv_fs_event_s uv_fs_event_t;
typedef struct uv_fs_poll_s uv_fs_poll_t;
typedef struct uv_signal_s uv_signal_t;

上面的结构都可以像uv_idle_t这么使用哦,我们只要一个个搞清楚就掌握libuv的使用了。

文件

我们看05_fs_readasync_context例子, 代码可以在learnuv下载

#include "learnuv.h"

#define BUF_SIZE 37
static const char *filename = __MAGIC_FILE__;

typedef struct context_struct {
  uv_fs_t *open_req;
} context_t;

void read_cb(uv_fs_t* read_req) {
  int r = 0;
  if (read_req->result < 0) CHECK(read_req->result, "uv_fs_read callback");

  /* extracting our context from the read_req */
  context_t* context = read_req->data;

  /* 4. Report the contents of the buffer */
  log_report("%s", read_req->bufs->base);
  log_info("%s", read_req->bufs->base);

  free(read_req->bufs->base);

  /* 5. Close the file descriptor (synchronously) */
  uv_fs_t close_req;
  r = uv_fs_close(uv_default_loop(), &close_req, context->open_req->result, NULL);
  if (r < 0) CHECK(abs(r), "uv_fs_close");

  /* cleanup all requests and context */
  uv_fs_req_cleanup(context->open_req);
  uv_fs_req_cleanup(read_req);
  uv_fs_req_cleanup(&close_req);
  free(context);
}

void init(uv_loop_t *loop) {
  int r = 0;

  /* No more globals, we need to malloc each request and pass it around for later cleanup */
  uv_fs_t *open_req = malloc(sizeof(uv_fs_t));
  uv_fs_t *read_req = malloc(sizeof(uv_fs_t));

  context_t *context = malloc(sizeof(context_t));
  context->open_req  = open_req;

  /* 1. Open file */
  r = uv_fs_open(loop, open_req, filename, O_RDONLY, S_IRUSR, NULL);
  if (r < 0) CHECK(r, "uv_fs_open");

  /* 2. Create buffer and initialize it to turn it into a a uv_buf_t */
  size_t buf_len = sizeof(char) * BUF_SIZE;
  char *buf = malloc(buf_len);
  uv_buf_t iov = uv_buf_init(buf, buf_len);

  /* allow us to access the context inside read_cb */
  read_req->data = context;

  /* 3. Read from the file into the buffer */
  r = uv_fs_read(loop, read_req, open_req->result, &iov, 1, 0, read_cb);
  if (r < 0) CHECK(r, "uv_fs_read");
}

int main() {
  uv_loop_t *loop = uv_default_loop();

  init(loop);

  uv_run(loop, UV_RUN_DEFAULT);

  return 0;
}

这个例子展示了几个技术:

网络

我们看一个回声服务的例子。这里的echo和命令行中的echo功能是一样的。这个例子用c语言写很长,而用JavaScript写则很简洁。可以看出用javascript的优势。

#include "learnuv.h"
#include <math.h>

const static char* HOST    = "0.0.0.0"; /* localhost */
const static int   PORT    = 7000;
const static int   NBUFS   = 1;         /* number of buffers we write at once */

static uv_tcp_t tcp_server;

typedef struct {
  uv_write_t req;
  uv_buf_t buf;
} write_req_t;

/* forward declarations */
static void close_cb(uv_handle_t* client);
static void server_close_cb(uv_handle_t*);
static void shutdown_cb(uv_shutdown_t*, int);

static void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
static void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
static void write_cb(uv_write_t*, int);

static void close_cb(uv_handle_t* client) {
  free(client);
  log_info("Closed connection");
}

static void shutdown_cb(uv_shutdown_t* req, int status) {
  uv_close((uv_handle_t*) req->handle, close_cb);
  free(req);
}

static void onconnection(uv_stream_t *server, int status) {
  CHECK(status, "onconnection");

  int r = 0;
  uv_shutdown_t *shutdown_req;

  /* 4. Accept client connection */
  log_info("Accepting Connection");

  /* 4.1. Init client connection using `server->loop`, passing the client handle */
  uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
  r = uv_tcp_init(server->loop, client);
  CHECK(r, "uv_tcp_init");

  /* 4.2. Accept the now initialized client connection */
  r = uv_accept(server, (uv_stream_t*) client);
  if (r) {
    log_error("trying to accept connection %d", r);

    shutdown_req = malloc(sizeof(uv_shutdown_t));
    r = uv_shutdown(shutdown_req, (uv_stream_t*) client, shutdown_cb);
    CHECK(r, "uv_shutdown");
  }

  /* 5. Start reading data from client */
  r = uv_read_start((uv_stream_t*) client, alloc_cb, read_cb);
  CHECK(r, "uv_read_start");
}

static void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
  /* libuv suggests a buffer size but leaves it up to us to create one of any size we see fit */
  buf->base = malloc(size);
  buf->len = size;
  if (buf->base == NULL) log_error("alloc_cb buffer didn't properly initialize");
}

static void read_cb(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
  int r = 0;
  uv_shutdown_t *shutdown_req;

  /* Errors or EOF */
  if (nread < 0) {
    if (nread != UV_EOF) CHECK(nread, "read_cb");

    /* Client signaled that all data has been sent, so we can close the connection and are done */
    free(buf->base);

    shutdown_req = malloc(sizeof(uv_shutdown_t));
    r = uv_shutdown(shutdown_req, client, shutdown_cb);
    CHECK(r, "uv_shutdown");
    return;
  }

  if (nread == 0) {
    /* Everything OK, but nothing read and thus we don't write anything */
    free(buf->base);
    return;
  }

  /* Check if we should quit the server which the client signals by sending "QUIT" */
  if (!strncmp("QUIT", buf->base, fmin(nread, 4))) {
    log_info("Closing the server");
    free(buf->base);
    /* Before exiting we need to properly close the server via uv_close */
    /* We can do this synchronously */
    uv_close((uv_handle_t*) &tcp_server, NULL);
    log_info("Closed server, exiting");
    exit(0);
  }

  /* 6. Write same data back to client since we are an *echo* server and thus can reuse the buffer used to read*/
  /*    We wrap the write req and buf here in order to be able to clean them both later */
  write_req_t *write_req = malloc(sizeof(write_req_t));
  write_req->buf = uv_buf_init(buf->base, nread);
  r = uv_write(&write_req->req, client, &write_req->buf, NBUFS, write_cb);
  CHECK(r, "uv_write");
}

static void write_cb(uv_write_t *req, int status) {
  CHECK(status, "write_cb");

  log_info("Replied to client");

  /* Since the req is the first field inside the wrapper write_req, we can just cast to it */
  /* Basically we are telling C to include a bit more data starting at the same memory location, which in this case is our buf */
  write_req_t *write_req = (write_req_t*) req;
  free(write_req->buf.base);
  free(write_req);
}

int main() {
  int r = 0;
  uv_loop_t *loop = uv_default_loop();

  /* 1. Initialize TCP server */
  r = uv_tcp_init(loop, &tcp_server);
  CHECK(r, "uv_tcp_init");

  /* 2. Bind to localhost:7000 */
  struct sockaddr_in addr;
  r = uv_ip4_addr(HOST, PORT, &addr);
  CHECK(r, "uv_ip4_addr");

  r = uv_tcp_bind(&tcp_server, (struct sockaddr*) &addr, AF_INET);
  CHECK(r, "uv_tcp_bind");

  /* 3. Start listening */
  /* uv_tcp_t inherits uv_stream_t so casting is ok */
  r = uv_listen((uv_stream_t*) &tcp_server, SOMAXCONN, onconnection);
  CHECK(r, "uv_listen");
  log_info("Listening on %s:%d", HOST, PORT);

  uv_run(loop, UV_RUN_DEFAULT);

  MAKE_VALGRIND_HAPPY();

  return 0;
}

我们看一下这里面的技术:

由于使用了比较多的callback,导致不是很清晰,但是c语言实现异步回调就只能这样了。而async和await方式的异步编程,才是最清晰的,这个在C#和最新的ES6中都有。

线程

libuv对线程也进行了跨平台包装。线程在libuv中的工作主要在于支持事件循环,因为异步的调用本质上还是需要用线程实现,另外开发者也可以用线程来运行计算密集型的任务;同时,线程之间也可以互相通讯。

我们看一个线程通信的例子, 为了接受下载情况的数据,我们使用了uv_async_senduv_async_init

uv_loop_t *loop;
uv_async_t async;

void fake_download(uv_work_t *req) {
    int size = *((int*) req->data);
    int downloaded = 0;
    double percentage;
    while (downloaded < size) {
        percentage = downloaded*100.0/size;
        async.data = (void*) &percentage;
        uv_async_send(&async);

        sleep(1);
        downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
                                           // but at least a 200;
    }
}

void print_progress(uv_async_t *handle, int status /*UNUSED*/) {
    double percentage = *((double*) handle->data);
    fprintf(stderr, "Downloaded %.2f%%\n", percentage);
}

void after(uv_work_t *req, int status) {
    fprintf(stderr, "Download complete\n");
    uv_close((uv_handle_t*) &async, NULL);
}


int main() {
    loop = uv_default_loop();

    uv_work_t req;
    int size = 10240;
    req.data = (void*) &size;

    uv_async_init(loop, &async, print_progress);
    uv_queue_work(loop, &req, fake_download, after);

    return uv_run(loop, UV_RUN_DEFAULT);
}

上面的uv_queue_work很有用,可以让我们把同步的程序放到队列中,在另一个线程执行(线程有线程池管理),实际上是对多线程编程接口的简化。fake_download中执行下载操作不会导致uv_default_loop阻塞,所以主循环还可以接受其他事件(比如处理更多的http请求)。

上面fake_download使用uv_async_send(&async);async发送消息,在print_progress中处理。print_progress运行在循环线程(uv_default_loop)中。

如果用线程,不可避免的需要同步变量,libuv 也支锁,信号量 semaphores, 条件变量 condition variables 和屏障 barriers, 相关的 API 也和 pthread 中的相似.

进程

libuv对进程进行了仔细的包装,屏蔽了操作系统的差异。允许进程和子进程之间通过streams和命名管道通信。

uv_loop_t *loop;
uv_process_t child_req;
uv_process_options_t options;

void on_exit(uv_process_t *req, int exit_status, int term_signal) {
    fprintf(stderr, "Process exited with status %d, signal %d\n", exit_status, term_signal);
    uv_close((uv_handle_t*) req, NULL);
}

int main() {
    loop = uv_default_loop();

    char* args[3];
    args[0] = "mkdir";
    args[1] = "test-dir";
    args[2] = NULL;

    options.exit_cb = on_exit;
    options.file = "mkdir";
    options.args = args;

    if (uv_spawn(loop, &child_req, options)) {
        fprintf(stderr, "%s\n", uv_strerror(uv_last_error(loop)));
        return 1;
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}
```
注意看`uv_spawn`和`uv_process_t`。实际上还有管道的api给我们使用,比如`uv_pipe_init`。这个例子比较简单,在网上可以找到利用多进程实现负载均衡的例子,可以进一步研究。


----
# 总结
剩下还有很多API没有涉及,本文的目的不是翻译,而是介绍libuv的基本概念,接着在看node的代码的时候就不会觉得很多API非常不熟悉了。

本文主要内容来此于:

[learnuv](https://github.com/thlorenz/learnuv.git),文章中的代码都可以在这里面找到。
[libuv-dox](https://github.com/thlorenz/libuv-dox)
[learnuv汉语翻译](http://www.nowx.org/uvbook/index.html)
上一篇下一篇

猜你喜欢

热点阅读