io专家

Linux中的异步IO

2018-12-12  本文已影响14人  CXYMichael

知道异步IO已经很久了,但是直到最近,才真正用它来解决一下实际问题(在一个CPU密集型的应用中,有一些需要处理的数据可能放在磁盘上。预先知道这些数据的位置,所以预先发起异步IO读请求。等到真正需要用到这些数据的时候,再等待异步IO完成。使用了异步IO,在发起IO请求到实际使用数据这段时间内,程序还可以继续做其他事情)。
假此机会,也顺便研究了一下linux下的异步IO的实现。

linux下主要有两套异步IO,一套是由glibc实现的(以下称之为glibc版本)、一套是由linux内核实现,并由libaio来封装调用接口(以下称之为linux版本)。

glibc版本

接口 glibc版本主要包含如下接口:

int aio_read(struct aiocb *aiocbp);  /* 提交一个异步读 */
int aio_write(struct aiocb *aiocbp); /* 提交一个异步写 */
int aio_cancel(int fildes, struct aiocb *aiocbp); /* 取消一个异步请求(或基于一个fd的所有异步请求,aiocbp==NULL) */
int aio_error(const struct aiocb *aiocbp);        /* 查看一个异步请求的状态(进行中EINPROGRESS?还是已经结束或出错?) */
ssize_t aio_return(struct aiocb *aiocbp);         /* 查看一个异步请求的返回值(跟同步读写定义的一样) */
int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout); /* 阻塞等待请求完成 */

其中,struct aiocb主要包含以下字段:

int               aio_fildes;        /* 要被读写的fd */
void *            aio_buf;           /* 读写操作对应的内存buffer */
__off64_t         aio_offset;        /* 读写操作对应的文件偏移 */
size_t            aio_nbytes;        /* 需要读写的字节长度 */
int               aio_reqprio;       /* 请求的优先级 */
struct sigevent   aio_sigevent;      /* 异步事件,定义异步操作完成时的通知信号或回调函数 */

实现 glibc的aio实现是比较通俗易懂的:
1、异步请求被提交到request_queue中;
2、request_queue实际上是一个表结构,"行"是fd、"列"是具体的请求。也就是说,同一个fd的请求会被组织在一起;
3、异步请求有优先级概念,属于同一个fd的请求会按优先级排序,并且最终被按优先级顺序处理;
4、随着异步请求的提交,一些异步处理线程被动态创建。这些线程要做的事情就是从request_queue中取出请求,然后处理之;
5、为避免异步处理线程之间的竞争,同一个fd所对应的请求只由一个线程来处理;
6、异步处理线程同步地处理每一个请求,处理完成后在对应的aiocb中填充结果,然后触发可能的信号通知或回调函数(回调函数是需要创建新线程来调用的);
7、异步处理线程在完成某个fd的所有请求后,进入闲置状态;
8、异步处理线程在闲置状态时,如果request_queue中有新的fd加入,则重新投入工作,去处理这个新fd的请求(新fd和它上一次处理的fd可以不是同一个);
9、异步处理线程处于闲置状态一段时间后(没有新的请求),则会自动退出。等到再有新的请求时,再去动态创建;
看起来,换作是我们,要在用户态实现一个异步IO,似乎大概也会设计成类似的样子……

linux版本

接口 下面再来看看linux版本的异步IO。它主要包含如下系统调用接口:

int io_setup(int maxevents, io_context_t *ctxp);  /* 创建一个异步IO上下文(io_context_t是一个句柄) */
int io_destroy(io_context_t ctx);  /* 销毁一个异步IO上下文(如果有正在进行的异步IO,取消并等待它们完成) */
long io_submit(aio_context_t ctx_id, long nr, struct iocb **iocbpp);  /* 提交异步IO请求 */
long io_cancel(aio_context_t ctx_id, struct iocb *iocb, struct io_event *result);  /* 取消一个异步IO请求 */
longio_getevents(aio_context_t ctx_id, long min_nr, long nr, structio_event *events, struct timespec *timeout)  /*等待并获取异步IO请求的事件(也就是异步请求的处理结果) */

其中,struct iocb主要包含以下字段:

__u16     aio_lio_opcode;     /* 请求类型(如:IOCB_CMD_PREAD=读、IOCB_CMD_PWRITE=写、等) */
__u32     aio_fildes;         /* 要被操作的fd */
__u64     aio_buf;            /* 读写操作对应的内存buffer */
__u64     aio_nbytes;         /* 需要读写的字节长度 */
__s64     aio_offset;         /* 读写操作对应的文件偏移 */
__u64     aio_data;           /* 请求可携带的私有数据(在io_getevents时能够从io_event结果中取得) */
__u32     aio_flags;          /* 可选IOCB_FLAG_RESFD标记,表示异步请求处理完成时使用eventfd进行通知(百度一下) */
__u32     aio_resfd;          /* 有IOCB_FLAG_RESFD标记时,接收通知的eventfd */

其中,struct io_event主要包含以下字段:
__u64     data;               /* 对应iocb的aio_data的值 */
__u64     obj;                /* 指向对应iocb的指针 */
__s64     res;                /* 对应IO请求的结果(>=0: 相当于对应的同步调用的返回值;<0: -errno) */

实现 io_context_t句柄在内核中对应一个struct kioctx结构,用来给一组异步IO请求提供一个上下文。其主要包含以下字段:

struct mm_struct*     mm;             /* 调用者进程对应的内存管理结构(代表了调用者的虚拟地址空间) */
unsigned long         user_id;        /* 上下文ID,也就是io_context_t句柄的值(等于ring_info.mmap_base) */
struct hlist_node     list;           /* 属于同一地址空间的所有kioctx结构通过这个list串连起来,链表头是mm->ioctx_list */
wait_queue_head_t     wait;           /* 等待队列(io_getevents系统调用可能需要等待,调用者就在该等待队列上睡眠) */
int                   reqs_active;    /* 进行中的请求数目 */
struct list_head      active_reqs;    /* 进行中的请求队列 */
unsigned              max_reqs;       /* 最大请求数(对应io_setup调用的int maxevents参数) */
struct list_head      run_list;       /* 需要aio线程处理的请求列表(某些情况下,IO请求可能交给aio线程来提交) */
struct delayed_work   wq;             /* 延迟任务队列(当需要aio线程处理请求时,将wq挂入aio线程对应的请求队列) */
struct aio_ring_info  ring_info;      /* 存放请求结果io_event结构的ring buffer */

其中,这个aio_ring_info结构比较值得一提,它是用于存放请求结果io_event结构的ring buffer。它主要包含了如下字段:

unsigned long   mmap_base;       /* ring buffer的地始地址 */
unsigned long   mmap_size;       /* ring buffer分配空间的大小 */
struct page**   ring_pages;      /* ring buffer对应的page数组 */
long            nr_pages;        /* 分配空间对应的页面数目(nr_pages * PAGE_SIZE = mmap_size) */
unsigned        nr, tail;        /* 包含io_event的数目及存取游标 */

这个数据结构看起来有些奇怪,直接弄一个io_event数组不就完事了么?为什么要维护mmap_base、mmap_size、ring_pages、nr_pages这么复杂的一组信息,而又把io_event结构隐藏起来呢?
这里的奇妙之处就在于,io_event结构的buffer是在用户态地址空间上分配的。注意,我们在内核里面看到了诸多数据结构都是在内核地址空间上分配的,因为这些结构都是内核专有的,没必要给用户程序看到,更不能让用户程序去修改。而这里的io_event却是有意让用户程序看到,而且用户就算修改了也不会对内核的正确性造成影响。于是这里使用了这样一个有些取巧的办法,由内核在用户态地址空间上分配buffer。(如果换一个保守点的做法,内核态可以维护io_event的buffer,然后io_getevents的时候,将对应的io_event复制一份到用户空间。)
按照这样的思路,io_setup时,内核会通过mmap在对应的用户空间分配一段内存,mmap_base、mmap_size就是这个内存映射对应的位置和大小。然后,光有映射还不行,还必须立马分配物理内存,ring_pages、nr_pages就是分配好的物理页面。(因为这些内存是要被内核直接访问的,内核会将异步IO的结果写入其中。如果物理页面延迟分配,那么内核访问这些内存的时候会发生缺页异常。而处理内核态的缺页异常又很麻烦,所以还不如直接分配物理内存的好。其二,内核在访问这个buffer里的信息时,也并不是通过mmap_base这个虚拟地址去直接访问的。既然是异步,那么结果写回的时候可能是在另一个上下文上面,虚拟地址空间都不同。为了避免进行虚拟地址空间的切换,内核干脆直接通过kmap将ring_pages映射到高端内存上去访问好了。)

然后,在mmap_base指向的用户空间的地址上,会存放着一个struct aio_ring结构,用来管理这个ring buffer。其主要包含了如下字段:

unsigned         id;                /* 等于aio_ring_info中的user_id */
unsigned         nr;                /* 等于aio_ring_info中的nr */
unsigned         head,tail;         /* io_events数组的游标 */
unsigned         magic,compat_features,incompat_features;
unsigned         header_length;     /* aio_ring结构的大小 */
struct io_event  io_events[0];      /* io_event的buffer */

终于,我们期待的io_event数组出现了。

看到这里,如果前面的内容你已经理解清楚了,你一定会有个疑问:既然整个aio_ring结构及其中的io_event缓冲都是放在用户空间的,内核还提供io_getevents系统调用干什么?用户程序不是直接就可以取用io_event,并且修改游标了么(内核作为生产者,修改aio_ring->tail;用户作为消费者,修改aio_ring->head)?我想,aio_ring之所以要放在用户空间,其原本用意应该就是这样的。
那么,用户空间如何知道aio_ring结构的地址(aio_ring_info->mmap_base)呢?其实kioctx结构中的user_id,也就是io_setup返回给用户的io_context_t,就等于aio_ring_info->mmap_base。
然后,aio_ring结构中还有诸如magic、compat_features、incompat_features这样的字段,用户空间可以读这些magic,以确定数据结构没有被异常篡改。如果一切可控,那么就自己动手、丰衣足食;否则就还是走io_getevents系统调用。而io_getevents系统调用通过aio_ring_info->ring_pages得到aio_ring结构,再将相应的io_event拷贝到用户空间。
下面贴一段libaio中的io_getevents的代码(前面提到过,linux版本的异步IO是由用户态的libaio来封装的):

int io_getevents_0_4(io_context_t ctx, long min_nr, long nr, struct io_event * events, struct timespec * timeout){
    struct aio_ring *ring;
    ring = (struct aio_ring*)ctx;
    if (ring==NULL || ring->magic != AIO_RING_MAGIC)
        goto do_syscall;
    if (timeout!=NULL && timeout->tv_sec == 0 && timeout->tv_nsec == 0) {
        if (ring->head == ring->tail)
            return 0;
    }
do_syscall:
    return __io_getevents_0_4(ctx, min_nr, nr, events, timeout);
}

其中确实用到了用户空间上的aio_ring结构的信息,不过尺度还是不够大。

以上就是异步IO的context的结构。那么,为什么linux版本的异步IO需要“上下文”这么个概念,而glibc版本则不需要呢?
在glibc版本中,异步处理线程是glibc在调用者进程中动态创建的线程,它和调用者必定是在同一个虚拟地址空间中的。这里已经隐含了“同一上下文”这么个关系。
而对于内核来说,要面对的是任意的进程,任意的虚拟地址空间。当处理一个异步请求时,内核需要在调用者对应的地址空间中存取数据,必须知道这个虚拟地址空间是什么。不过当然,如果设计上要想把“上下文”这个概念隐藏了也是肯定可以的(比如让每个mm隐含一个异步IO上下文)。具体如何选择,只是设计上的问题。

struct iocb在内核中又对应到struct kiocb结构,主要包含以下字段:

struct kioctx*       ki_ctx;           /* 请求对应的kioctx(上下文结构) */
struct list_head     ki_run_list;      /* 需要aio线程处理的请求,通过该字段链入ki_ctx->run_list */
struct list_head     ki_list;          /* 链入ki_ctx->active_reqs */
struct file*         ki_filp;          /* 对应的文件指针 */
void __user*         ki_obj.user;      /* 指向用户态的iocb结构 */
__u64                ki_user_data;     /* 等于iocb->aio_data */
loff_t               ki_pos;           /* 等于iocb->aio_offset */
unsigned short       ki_opcode;        /* 等于iocb->aio_lio_opcode */
size_t               ki_nbytes;        /* 等于iocb->aio_nbytes */
char __user *        ki_buf;           /* 等于iocb->aio_buf */
size_t               ki_left;          /* 该请求剩余字节数(初值等于iocb->aio_nbytes) */
struct eventfd_ctx*  ki_eventfd;       /* 由iocb->aio_resfd对应的eventfd对象 */
ssize_t (*ki_retry)(struct kiocb *);   /*由ki_opcode选择的请求提交函数*/

调用io_submit后,对应于用户传递的每一个iocb结构,会在内核态生成一个与之对应的kiocb结构,并且在对应kioctx结构的ring_info中预留一个io_events的空间。之后,请求的处理结果就被写到这个io_event中。
然后,对应的异步读写(或其他)请求就被提交到了虚拟文件系统,实际上就是调用了file->f_op->aio_read或file->f_op->aio_write(或其他)。也就是,在经历磁盘高速缓存层、通用块层之后,请求被提交到IO调度层,等待被处理。这个跟普通的文件读写请求是类似的。
在《linux文件读写浅析》中可以看到,对于非direct-io的读请求来说,如果pagecache不命中,那么IO请求会被提交到底层。之后,do_generic_file_read会通过lock_page操作,等待数据最终读完。这一点跟异步IO是背道而驰的,因为异步就意味着请求提交后不能等待,必须马上返回。而对于非direct-io的写请求,写操作一般仅仅是将数据更新作用到page cache上,并不需要真正的写磁盘。pagecache写回磁盘本身是一个异步的过程。可见,对于非direct-io的文件读写,使用linux版本的异步IO接口完全没有意义(就跟使用同步接口效果一样)。
为什么会有这样的设计呢?因为非direct-io的文件读写是只跟page cache打交道的。而pagecache是内存,跟内存打交道又不会存在阻塞,那么也就没有什么异步的概念了。至于读写磁盘时发生的阻塞,那是pagecache跟磁盘打交道时发生的事情,跟应用程序又没有直接关系。
然而,对于direct-io来说,异步则是有意义的。因为direct-io是应用程序的buffer跟磁盘的直接交互(不使用page cache)。

这里,在使用direct-io的情况下,file->f_op->aio_{read,write}提交完IO请求就直接返回了,然后io_submit系统调用返回。(见后面的执行流程。)
通过linux内核异步触发的IO调度(如:被时钟中断触发、被其他的IO请求触发、等),已经提交的IO请求被调度,由对应的设备驱动程序提交给具体的设备。对于磁盘,一般来说,驱动程序会发起一次DMA。然后又经过若干时间,读写请求被磁盘处理完成,CPU将收到表示DMA完成的中断信号,设备驱动程序注册的处理函数将在中断上下文中被调用。这个处理函数会调用end_request函数来结束这次请求。这个流程跟《linux文件读写浅析》中所说的非direct-io读操作的情况是一样的。
不同的是,对于同步非direct-io,end_request将通过清除page结构的PG_locked标记来唤醒被阻塞的读操作流程,异步IO和同步IO效果一样。而对于direct-io,除了唤醒被阻塞的读操作流程(同步IO)或io_getevents流程(异步IO)之外,还需要将IO请求的处理结果填回对应的io_event中。
最后,等到调用者调用io_getevents的时候,就能获取到请求对应的结果(io_event)。而如果调用io_getevents的时候结果还没出来,流程也会被阻塞,并且会在direct-io的end_request过程中得到唤醒。

linux版本的异步IO也有aio线程(每CPU一个),但是跟glibc版本中的异步处理线程不同,这里的aio线程是用来处理请求重试的。某些情况下,file->f_op->aio_{read,write}可能会返回-EIOCBRETRY,表示需要重试(只有一些特殊的IO设备会这样)。而调用者既然使用的是异步IO接口,肯定不希望里面会有等待/重试的逻辑。所以,如果遇到-EIOCBRETRY,内核就在当前CPU对应的aio线程添加一个任务,让aio线程来完成请求的重新提交。而调用流程可以直接返回,不需要阻塞。
请求在aio线程中提交和在调用者进程中提交相比,有一个最大的不同,就是aio线程使用的地址空间可能跟调用者线程不一样。需要利用kioctx->mm切换到正确的地址空间,然后才能发请求。(参见《浅尝异步IO》中的讨论。)

内核处理流程 最后,整理一下direct-io异步读操作的处理流程:
io_submit。对于提交的iocbpp数组中的每一个iocb(异步请求),调用io_submit_one来提交它们;
io_submit_one。为请求分配一个kiocb结构,并且在对应的kioctx的ring_info中为它预留一个对应的io_event。然后调用aio_rw_vect_retry来提交这个读请求;
aio_rw_vect_retry。调用file->f_op->aio_read。这个函数通常是由generic_file_aio_read或者其封装来实现的;
generic_file_aio_read。对于非direct-io,会调用do_generic_file_read来处理请求(见《linux文件读写浅析》)。而对于direct-io,则是调用mapping->a_ops->direct_IO。这个函数通常就是blkdev_direct_IO;
blkdev_direct_IO。调用filemap_write_and_wait_range将相应位置可能存在的page cache废弃掉或刷回磁盘(避免产生不一致),然后调用direct_io_worker来处理请求;
direct_io_worker。一次读可能包含多个读操作(对应于类readv系统调用),对于其中的每一个,调用do_direct_IO;
do_direct_IO。调用submit_page_section;
submit_page_section。调用dio_new_bio分配对应的bio结构,然后调用dio_bio_submit来提交bio;
dio_bio_submit。调用submit_bio提交请求。后面的流程就跟非direct-io是一样的了,然后等到请求完成,驱动程序将调用bio->bi_end_io来结束这次请求。对于direct-io下的异步IO,bio->bi_end_io等于dio_bio_end_aio;
dio_bio_end_aio。调用wake_up_process唤醒被阻塞的进程(异步IO下,主要是io_getevents的调用者)。然后调用aio_complete;
aio_complete。将处理结果写回到对应的io_event中;

比较

从上面的流程可以看出,linux版本的异步IO实际上只是利用了CPU和IO设备可以异步工作的特性(IO请求提交的过程主要还是在调用者线程上同步完成的,请求提交后由于CPU与IO设备可以并行工作,所以调用流程可以返回,调用者可以继续做其他事情)。相比同步IO,并不会占用额外的CPU资源。
而glibc版本的异步IO则是利用了线程与线程之间可以异步工作的特性,使用了新的线程来完成IO请求,这种做法会额外占用CPU资源(对线程的创建、销毁、调度都存在CPU开销,并且调用者线程和异步处理线程之间还存在线程间通信的开销)。不过,IO请求提交的过程都由异步处理线程来完成了(而linux版本是调用者来完成的请求提交),调用者线程可以更快地响应其他事情。如果CPU资源很富足,这种实现倒也还不错。

还有一点,当调用者连续调用异步IO接口,提交多个异步IO请求时。在glibc版本的异步IO中,同一个fd的读写请求由同一个异步处理线程来完成。而异步处理线程又是同步地、一个一个地去处理这些请求。所以,对于底层的IO调度器来说,它一次只能看到一个请求。处理完这个请求,异步处理线程才会提交下一个。而内核实现的异步IO,则是直接将所有请求都提交给了IO调度器,IO调度器能看到所有的请求。请求多了,IO调度器使用的类电梯算法就能发挥更大的功效。请求少了,极端情况下(比如系统中的IO请求都集中在同一个fd上,并且不使用预读),IO调度器总是只能看到一个请求,那么电梯算法将退化成先来先服务算法,可能会极大的增加碰头移动的开销。

最后,glibc版本的异步IO支持非direct-io,可以利用内核提供的page cache来提高效率。而linux版本只支持direct-io,cache的工作就只能靠用户程序来实现了。

Linux aio是Linux下的异步读写模型。
对于文件的读写,即使以O_NONBLOCK方式来打开一个文件,也会处于"阻塞"状态。因为文件时时刻刻处于可读状态。而从磁盘到内存所等待的时间是惊人的。为了充份发挥把数据从磁盘复制到内存的时间,引入了aio模型。linux下有aio封装,但是aio采用的是线程或信号用以通知,为了能更多的控制io行为,可以使用更为低级libaio。

一、基本函数与结构

1. libaio函数

extern int io_setup(int maxevents, io_context_t *ctxp);
extern int io_destroy(io_context_t ctx);
extern int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
extern int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt);
extern int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

2. 结构

struct io_iocb_poll {
    PADDED(int events, __pad1);
};  /* result code is the set of result flags or -'ve errno */

struct io_iocb_sockaddr {
    struct sockaddr *addr;
    int     len;
};  /* result code is the length of the sockaddr, or -'ve errno */

struct io_iocb_common {
    PADDEDptr(void  *buf, __pad1);
    PADDEDul(nbytes, __pad2);
    long long   offset;
    long long   __pad3;
    unsigned    flags;
    unsigned    resfd;
};  /* result code is the amount read or -'ve errno */

struct io_iocb_vector {
    const struct iovec  *vec;
    int         nr;
    long long       offset;
};  /* result code is the amount read or -'ve errno */

struct iocb {
    PADDEDptr(void *data, __pad1);  /* Return in the io completion event */
    PADDED(unsigned key, __pad2);   /* For use in identifying io requests */

    short       aio_lio_opcode; 
    short       aio_reqprio;
    int     aio_fildes;

    union {
        struct io_iocb_common       c;
        struct io_iocb_vector       v;
        struct io_iocb_poll     poll;
        struct io_iocb_sockaddr saddr;
    } u;
};

struct io_event {
    PADDEDptr(void *data, __pad1);
    PADDEDptr(struct iocb *obj,  __pad2);
    PADDEDul(res,  __pad3);
    PADDEDul(res2, __pad4);
};

3. 内联函数

static inline void io_set_callback(struct iocb *iocb, io_callback_t cb);
static inline void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
static inline void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);
static inline void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
static inline void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec *iov, int iovcnt, long long offset);
/* Jeff Moyer says this was implemented in Red Hat AS2.1 and RHEL3.
 * AFAICT, it was never in mainline, and should not be used. --RR */
static inline void io_prep_poll(struct iocb *iocb, int fd, int events);
static inline int io_poll(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd, int events);
static inline void io_prep_fsync(struct iocb *iocb, int fd);
static inline int io_fsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd);
static inline void io_prep_fdsync(struct iocb *iocb, int fd);
static inline int io_fdsync(io_context_t ctx, struct iocb *iocb, io_callback_t cb, int fd);
static inline void io_set_eventfd(struct iocb *iocb, int eventfd);

二、使用方法

1、初使化io_context
2、open文件取得fd
3、根据fd,buffer offset等息建立iocb
4、submit iocb到context
5、io_getevents取得events状态
6、回到3步

三、例子

#include <unistd.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <error.h>  
#include <errno.h>  
  
#include <fcntl.h>  
#include <libaio.h>  
  
int main(int argc, char *argv[])  
{  
    // 每次读入32K字节  
    const int buffer_size = 0x8000;  
  
    // 最大事件数 32  
    const int nr_events   = 32;  
    int rt;  
  
    io_context_t ctx = {0};  
  
    // 初使化 io_context_t  
    rt = io_setup(nr_events, &ctx);  
    if ( rt != 0 )  
        error(1, rt, "io_setup");  
  
    // 依次读取参数作为文件名加入提交到ctx  
    int pagesize = sysconf(_SC_PAGESIZE);  
    for (int i=1; i<argc; ++i) {  
        iocb *cb = (iocb*)malloc(sizeof(iocb));  
        void *buffer;  
        // 要使用O_DIRECT, 必须要对齐  
        posix_memalign(&buffer, pagesize, buffer_size);  
        io_prep_pread(cb, open(argv[i], O_RDONLY | O_DIRECT), buffer, buffer_size, 0);  
        rt = io_submit(ctx, 1, &cb);  
        if (rt < 0)  
            error(1, -rt, "io_submit %s", argv[i]);;  
    }  
  
    io_event events[nr_events];  
    iocb     *cbs[nr_events];  
  
    int remain = argc - 1;  
    int n      = 0;  
  
    // 接收数据最小返回的请求数为1,最大为nr_events  
    while (remain && (n = io_getevents(ctx, 1, nr_events, events, 0))) {  
        int nr_cbs = 0;  
        for (int i=0; i<n; ++i) {  
            io_event &event = events[i];  
            iocb     *cb    = event.obj;  
            // event.res为unsigned  
            //printf("%d receive %d bytes\n", cb->aio_fildes, event.res);  
            if (event.res > buffer_size) {  
                printf("%s\n", strerror(-event.res));  
            }  
            if (event.res != buffer_size || event.res2 != 0) {  
                --remain;  
                // 释放buffer, fd 与 cb  
                free(cb->u.c.buf);  
                close(cb->aio_fildes);  
                free(cb);  
            } else {  
                // 更新cb的offset  
                cb->u.c.offset += event.res;  
                cbs[nr_cbs++] = cb;  
            }  
        }  
  
        if (nr_cbs) {  
            // 继续接收数据  
            io_submit(ctx, nr_cbs, cbs);  
        }  
    }  
    return 0;  
}  

运行

$ truncate foo.txt -s 100K
$ truncate foo2.txt -s 200K
$ g++ -O3 libaio_simple.cc -laio && ./a.out foo.txt foo2.txt
3 received 32768 bytes
4 received 32768 bytes
3 received 32768 bytes
4 received 32768 bytes
3 received 32768 bytes
4 received 32768 bytes
3 received 4096 bytes
3 done.
4 received 32768 bytes
4 received 32768 bytes
4 received 32768 bytes
4 received 8192 bytes
4 done.

四、AIO与epoll

在使用AIO时,需要通过系统调用<tt>io_getevents</tt><tt>获取已经完成的</tt><tt>IO</tt><tt>事件,而</tt>系统调用<tt>io_getevents</tt><tt>是阻塞的,所以有</tt><tt>2</tt><tt>种方式:</tt><tt>(1)</tt><tt>使用多线程,用专门的线程调用</tt><tt>io_getevents</tt><tt>,参考</tt><tt>MySQL5.5</tt><tt>及以上版本;</tt><tt>(2)</tt><tt>对于单线程程序,可以通过</tt><tt>epoll</tt><tt>来使用</tt><tt>AIO</tt><tt>;不过,这需要系统调用</tt><tt>eventfd</tt><tt>的支持,而该系统调用只在</tt><tt>2.6.22</tt><tt>之后的内核才支持。</tt>

eventfd 是 Linux-native aio 其中的一个 API,用来生成 file descriptors,这些 file descriptors 可为应用程序提供更高效 “等待/通知” 的事件机制。和 pipe 作用相似,但比 pipe 更好,一方面它只用到一个 file descriptor(pipe 要用两个),节省了内核资源;另一方面,eventfd 的缓冲区管理要简单得多,pipe 需要不定长的缓冲区,而 eventfd 全部缓冲只有定长 8 bytes。

关于AIO与epoll的结合,请参考:

nginx 0.8.x稳定版对linux aio的支持(http://www.pagefault.info/?p=76)

五、AIO与direct IO

AIO需要与direct IO结合。

关于direct IO的简单实现,可以参考:

Linux 中直接 I/O 机制的介绍

http://www.ibm.com/developerworks/cn/linux/l-cn-directio/index.html
文章转载自:
https://www.cnblogs.com/skyofbitbit/p/3655981.html
https://blog.csdn.net/brucexu1978/article/details/7085924

上一篇下一篇

猜你喜欢

热点阅读