第三章 3.2.13 Dispatch I/O
2018-07-26 本文已影响3人
MrSYLong
Dispatch I/O和Dispatch Data可以实现输入/输出做到多个线程并列读取。
Dispatch I/O读写文件时,使用Global Dispatch Queue将一个文件按某个大小read/write。
分割读取的数据使用Dispatch Data可以更为简单的进行结合和分割。
// Apple System Log API用的源代码
static int
_asl_auxiliary(aslmsg msg, const char *title, const char *uti, const char *url, int *out_fd)
{
asl_msg_t *merged_msg;
asl_msg_aux_t aux;
asl_msg_aux_0_t aux0;
fileport_t fileport;
kern_return_t kstatus;
uint32_t outlen, newurllen, len, where;
int status, fd, fdpair[2];
caddr_t out, newurl;
dispatch_queue_t pipe_q;
dispatch_io_t pipe_channel;
dispatch_semaphore_t sem;
/* ..... 此处省略若干代码.....*/
// 创建串行队列
pipe_q = dispatch_queue_create("PipeQ", NULL);
// 生成Dispatch I/O,指定发生错误时执行处理的Block,以及执行该Block的Dispatch Queue。
pipe_channel = dispatch_io_create(DISPATCH_IO_STREAM, fd, pipe_q, ^(int err){
close(fd);
});
*out_fd = fdpair[1];
// 该函数设定一次读取的大小(分割大小)
dispatch_io_set_low_water(pipe_channel, SIZE_MAX);
// 使用Global Dispatch Queue并列读取,当每个分割的文件块读取结束,将Dispatch Data传递给回调的Block.
dispatch_io_read(pipe_channel, 0, SIZE_MAX, pipe_q, ^(bool done, dispatch_data_t pipedata, int err){
if (err == 0) // err等于0 说明读取无误
{
// 读取完“单个文件块”的大小
size_t len = dispatch_data_get_size(pipedata);
if (len > 0)
{
// 定义一个字节数组bytes
const char *bytes = NULL;
char *encoded;
dispatch_data_t md = dispatch_data_create_map(pipedata, (const void **)&bytes, &len);
encoded = asl_core_encode_buffer(bytes, len);
asl_set((aslmsg)merged_msg, ASL_KEY_AUX_DATA, encoded);
free(encoded);
_asl_send_message(NULL, merged_msg, -1, NULL);
asl_msg_release(merged_msg);
dispatch_release(md);
}
}
if (done)
{
dispatch_semaphore_signal(sem);
dispatch_release(pipe_channel);
dispatch_release(pipe_q);
}
});
// 异步串行读取文件
NSString *desktop = @"/Users/xxxx/Desktop";
NSString *path = [desktop stringByAppendingPathComponent:@"整理.md"];
dispatch_queue_t queue = dispatch_queue_create("queue", NULL);
dispatch_fd_t fd = open(path.UTF8String, O_RDONLY, 0);
dispatch_io_t io = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
close(fd);
});
size_t water = 1024 * 1024;
dispatch_io_set_low_water(io, water);
dispatch_io_set_high_water(io, water);
long long fileSize = [[NSFileManager defaultManager] attributesOfItemAtPath:path error:nil].fileSize;
NSMutableData *totalData = [[NSMutableData alloc] init];
dispatch_io_read(io, 0, fileSize, queue, ^(bool done, dispatch_data_t _Nullable data, int error) {
if (error == 0) {
size_t len = dispatch_data_get_size(data);
if (len > 0) {
[totalData appendData:(NSData *)data];
}
}
if (done) {
NSString *str = [[NSString alloc] initWithData:totalData encoding:NSUTF8StringEncoding];
NSLog(@"%@", str);
}
});
// 异步并行读取文件
NSString *desktop = @"/Users/xxx/Desktop";
NSString *path = [desktop stringByAppendingPathComponent:@"整理.md"];
dispatch_queue_t queue = dispatch_queue_create("queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_fd_t fd = open(path.UTF8String, O_RDONLY);
dispatch_io_t io = dispatch_io_create(DISPATCH_IO_RANDOM, fd, queue, ^(int error) {
close(fd);
});
off_t currentSize = 0;
long long fileSize = [[NSFileManager defaultManager] attributesOfItemAtPath:path error:nil].fileSize;
size_t offset = 1024 * 1024;
dispatch_group_t group = dispatch_group_create();
NSMutableData *totalData = [[NSMutableData alloc] initWithLength:fileSize];
for (; currentSize <= fileSize; currentSize += offset) {
dispatch_group_enter(group);
dispatch_io_read(io, currentSize, offset, queue, ^(bool done, dispatch_data_t _Nullable data, int error) {
if (error == 0) {
size_t len = dispatch_data_get_size(data);
if (len > 0) {
const void *bytes = NULL;
(void)dispatch_data_create_map(data, (const void **)&bytes, &len);
[totalData replaceBytesInRange:NSMakeRange(currentSize, len) withBytes:bytes length:len];
}
}
if (done) {
dispatch_group_leave(group);
NSString *str = [[NSString alloc] initWithData:totalData encoding:NSUTF8StringEncoding];
NSLog(@"%@", str);
}
});
}