第三章 3.2.13 Dispatch I/O

2018-07-26  本文已影响3人  MrSYLong

Dispatch I/O和Dispatch Data可以实现输入/输出做到多个线程并列读取。
Dispatch I/O读写文件时,使用Global Dispatch Queue将一个文件按某个大小read/write。
分割读取的数据使用Dispatch Data可以更为简单的进行结合和分割。

// Apple System Log API用的源代码
static int
_asl_auxiliary(aslmsg msg, const char *title, const char *uti, const char *url, int *out_fd)
{
    asl_msg_t *merged_msg;
    asl_msg_aux_t aux;
    asl_msg_aux_0_t aux0;
    fileport_t fileport;
    kern_return_t kstatus;
    uint32_t outlen, newurllen, len, where;
    int status, fd, fdpair[2];
    caddr_t out, newurl;
    dispatch_queue_t pipe_q;
    dispatch_io_t pipe_channel;
    dispatch_semaphore_t sem;
    /* ..... 此处省略若干代码.....*/
    
    // 创建串行队列
    pipe_q = dispatch_queue_create("PipeQ", NULL);
    // 生成Dispatch I/O,指定发生错误时执行处理的Block,以及执行该Block的Dispatch Queue。 
    pipe_channel = dispatch_io_create(DISPATCH_IO_STREAM, fd, pipe_q, ^(int err){
        close(fd);
    });
    
    *out_fd = fdpair[1];
    
    // 该函数设定一次读取的大小(分割大小)
    dispatch_io_set_low_water(pipe_channel, SIZE_MAX);
    // 使用Global Dispatch Queue并列读取,当每个分割的文件块读取结束,将Dispatch Data传递给回调的Block.
    dispatch_io_read(pipe_channel, 0, SIZE_MAX, pipe_q, ^(bool done, dispatch_data_t pipedata, int err){
        if (err == 0) // err等于0 说明读取无误
        {
            // 读取完“单个文件块”的大小
            size_t len = dispatch_data_get_size(pipedata);
            if (len > 0)
            {
                // 定义一个字节数组bytes
                const char *bytes = NULL;
                char *encoded;
                
                dispatch_data_t md = dispatch_data_create_map(pipedata, (const void **)&bytes, &len);
                encoded = asl_core_encode_buffer(bytes, len);
                asl_set((aslmsg)merged_msg, ASL_KEY_AUX_DATA, encoded);
                free(encoded);
                _asl_send_message(NULL, merged_msg, -1, NULL);
                asl_msg_release(merged_msg);
                dispatch_release(md);
            }
        }
        
        if (done)
        {
            dispatch_semaphore_signal(sem);
            dispatch_release(pipe_channel);
            dispatch_release(pipe_q);
        }
    });

// 异步串行读取文件
NSString *desktop = @"/Users/xxxx/Desktop";
NSString *path = [desktop stringByAppendingPathComponent:@"整理.md"];
dispatch_queue_t queue = dispatch_queue_create("queue", NULL);

dispatch_fd_t fd = open(path.UTF8String, O_RDONLY, 0);
dispatch_io_t io = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
    close(fd);
});

size_t water = 1024 * 1024;
dispatch_io_set_low_water(io, water);
dispatch_io_set_high_water(io, water);

long long fileSize = [[NSFileManager defaultManager] attributesOfItemAtPath:path error:nil].fileSize;
NSMutableData *totalData = [[NSMutableData alloc] init];

dispatch_io_read(io, 0, fileSize, queue, ^(bool done, dispatch_data_t  _Nullable data, int error) {
    
    if (error == 0) {
        size_t len = dispatch_data_get_size(data);
        if (len > 0) {
            [totalData appendData:(NSData *)data];
        }
    }
    
    if (done) {
        NSString *str = [[NSString alloc] initWithData:totalData encoding:NSUTF8StringEncoding];
        NSLog(@"%@", str);
    }
});
// 异步并行读取文件
NSString *desktop = @"/Users/xxx/Desktop";
NSString *path = [desktop stringByAppendingPathComponent:@"整理.md"];

dispatch_queue_t queue = dispatch_queue_create("queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_fd_t fd = open(path.UTF8String, O_RDONLY);
dispatch_io_t io = dispatch_io_create(DISPATCH_IO_RANDOM, fd, queue, ^(int error) {
    close(fd);
});

off_t currentSize = 0;
long long fileSize = [[NSFileManager defaultManager] attributesOfItemAtPath:path error:nil].fileSize;

size_t offset = 1024 * 1024;
dispatch_group_t group = dispatch_group_create();
NSMutableData *totalData = [[NSMutableData alloc] initWithLength:fileSize];

for (; currentSize <= fileSize; currentSize += offset) {
    dispatch_group_enter(group);
    dispatch_io_read(io, currentSize, offset, queue, ^(bool done, dispatch_data_t  _Nullable data, int error) {
        
        if (error == 0) {
            size_t len = dispatch_data_get_size(data);
            if (len > 0) {
                const void *bytes = NULL;
                (void)dispatch_data_create_map(data, (const void **)&bytes, &len);
                [totalData replaceBytesInRange:NSMakeRange(currentSize, len) withBytes:bytes length:len];
            }
        }
        
        if (done) {
            dispatch_group_leave(group);
            
            NSString *str = [[NSString alloc] initWithData:totalData encoding:NSUTF8StringEncoding];
            NSLog(@"%@", str);
        }
    });
}
上一篇下一篇

猜你喜欢

热点阅读