[redis 源码走读] 多线程通信 I/O

2020-08-09  本文已影响0人  wenfh2020

本章重点走读 redis 网络 I/O 的多线程部分源码。

哈希表 + 内存数据库 + 非阻塞系统调用 + 多路复用 I/O 事件驱动,使得 redis 单线程处理主逻辑足够高效。当并发上来后,数据的逻辑处理肯定要占用大量时间,那样,客户端与服务端通信处理就会变得迟钝。所以在合适的时候(根据任务量自适应)采用多线程处理,充分地利用多核优势,分担主线程压力,使得客户端和服务端通信更加敏捷。


redis 6.0 新增多线程处理网络 I/O,默认是关闭的,需要修改配置开启。对于这个新特性,redis 作者建议:如果项目确实遇到性能问题,再开启多线程处理网络读写事件。否则开启没什么意义,还会浪费 CPU 资源。线程数量不要超过 cpu 核心数量 - 1,预留一个核心。


🔥 文章来源:wenfh2020.com


1. 配置

多线程这两个设置项,默认是关闭的。

# redis.conf

# 配置多线程处理线程个数,数量最好少于 cpu 核心,默认 4。
# io-threads 4
#
# 多线程是否处理读事件,默认关闭。
# io-threads-do-reads no

redis 作者建议:


2. 主线程工作流程

redis 多线程I/O通信流程

流程图来源:《redis 异步网络I/O通信流程 - 多线程

  1. 主线程通过事件驱动从内核获取就绪事件,记录下需要延时操作的客户端连接。
  2. 多线程并行处理延时读事件。
  3. 多线程处理延时写事件。
  4. 重新执行第一步,循环执行。

int main(int argc, char **argv) {
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    ...
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 向内核获取就绪的可读可写事件事件进行处理,处理时钟事件。
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    ...
    // 从内核中取出就绪的可读可写事件。
    numevents = aeApiPoll(eventLoop, tvp);

    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop);

    for (j = 0; j < numevents; j++) {
        // 处理读写事件。
    }
    ...
    // 处理时钟事件。
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    ...
}
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    // write
    handleClientsWithPendingWritesUsingThreads();
    ...
}

void afterSleep(struct aeEventLoop *eventLoop) {
    ...
    // read
    handleClientsWithPendingReadsUsingThreads();
}

3. 多线程协作

redis 多线程I/O通信流程

流程图来源:《redis 异步网络I/O通信流程 - 多线程


3.1. 特点

主线程实现主逻辑,子线程辅助实现任务。


3.2. 忙等

多线程模式,存在忙等现象,这个处理有点超出了常规思维。


3.2.1. 源码实现

// write
int handleClientsWithPendingWritesUsingThreads(void) {
    ...
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    ...
}

// read
int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    ...
}
void *IOThreadMain(void *myid) {
    ...
    while(1) {
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        ...
    }
}

3.2.2. 优缺点

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    // 如果单线程模式就直接返回。
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

3.3. 源码分析

3.3.1. 概述


3.3.2. 源码

// 最大线程个数。
#define IO_THREADS_MAX_NUM 128

// 线程读操作。
#define IO_THREADS_OP_READ 0

// 线程写操作。
#define IO_THREADS_OP_WRITE 1

// 线程数组。
pthread_t io_threads[IO_THREADS_MAX_NUM];

// 互斥变量数组,提供主线程上锁和解锁子线程工作。
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];

// 原子变量数组,分别存储每个线程要处理的延时处理链接数量。主线程用来统计线程是否处理完等待事件,从而进行下一步操作。
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

// 是否启动了多线程处理模式。
int io_threads_active;

// 线程操作类型。多线程每次只能处理一种类型的操作:读/写。
int io_threads_op;

// 子线程列表,子线程个数为 IO_THREADS_MAX_NUM - 1,因为主线程也会处理延时任务。
list *io_threads_list[IO_THREADS_MAX_NUM];

void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    if (server.io_threads_num == 1) return;

    // 检查配置的线程数量是否超出限制。
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    // 创建 server.io_threads_num - 1 个子线程。
    for (int i = 0; i < server.io_threads_num; i++) {
        io_threads_list[i] = listCreate();

        // 0 号线程不创建,0 号就是主线程,主线程也会处理任务逻辑。
        if (i == 0) continue;

        // 创建子线程,主线程先对子线程上锁,挂起子线程,不让子线程进入工作模式。
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]);
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
void startThreadedIO(void) {
    serverAssert(io_threads_active == 0);
    for (int j = 1; j < server.io_threads_num; j++)
        // 子线程因为上锁等待主线程解锁,当主线程解锁子线程,子线程重新进入工作状态。
        pthread_mutex_unlock(&io_threads_mutex[j]);
    io_threads_active = 1;
}
void *IOThreadMain(void *myid) {
    // 每个线程在创建的时候会产生一个业务 id。
    long id = (unsigned long)myid;

    while(1) {
        // 替代 sleep,用忙等,这样能实时处理业务。但是也付出了耗费 cpu 的代价。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 留机会给主线程上锁,挂起当前子线程。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        // 根据操作类型,处理对应的读/写逻辑。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
    }
}
int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    // 如果单线程模式就直接返回。
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}
void stopThreadedIO(void) {
    // 在停止线程前,仍然有等待处理的延时读数据处理,需要先处理再停止线程。
    handleClientsWithPendingReadsUsingThreads();

    serverAssert(io_threads_active == 1);

    // 主给子线程上锁,挂起子线程。
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    io_threads_active = 0;
}
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    // 将等待处理的链接,通过取模放进不同的队列中去。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 分别统计每个队列要处理链接的个数。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程处理第一个队列。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 读客户端发送的数据到缓存。
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 主线程处理完任务后,忙等其它线程,全部线程处理完任务后,再处理命令实现逻辑。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    /* 主线程处理命令逻辑,因为链接都标识了等待状态,读完数据后命令对应的业务逻辑还没有被处理。
     * 这里去掉等待标识,处理命令业务逻辑。*/
    listRewind(server.clients_pending_read,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~ CLIENT_PENDING_COMMAND;
            // 读取数据,解析协议取出命令参数,执行命令,填充回复缓冲区。
            processCommandAndResetClient(c);
        }
        // 继续解析协议,取出命令参数,执行命令,填充回复缓冲区。
        processInputBufferAndReplicate(c);
    }
    listEmpty(server.clients_pending_read);
    return processed;
}
int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0;

    // 如果延时写事件对应的 client 链接很少,关闭多线程模式,用主线程处理异步逻辑。
    if (stopThreadedIOIfNeeded()) {
        // 处理延时写事件。
        return handleClientsWithPendingWrites();
    }

    if (!io_threads_active) startThreadedIO();

    // 将等待处理的链接,通过取模放进不同的队列中去,去掉延迟写标识。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 线程处理写事件。
    io_threads_op = IO_THREADS_OP_WRITE;

    // 分别统计每个队列要处理链接的个数。
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程处理第一个队列。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 写数据,发送给回复给客户端。
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 主线程处理完任务后,忙等其它线程,全部线程处理完任务后,再处理命令实现逻辑。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 如果缓存中还有没有发送完的数据,继续发送或者下次继续发,否则从事件驱动删除 fd 注册的可写事件。
        if (clientHasPendingReplies(c)
            && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

4. 数据结构

redisServerclient 分别 redis 是服务端和客户端的数据结构,理解结构的成员作用是走读源码逻辑的关键。有兴趣的朋友下个断点跑下逻辑,细节就不详细展开了。

用 gdb 调试 redis

// server.h
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    connection *conn;
    ...
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    ...
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
    ...
}
struct redisServer {
    ...
    list *clients;              /* List of active clients */
    list *clients_to_close;     /* Clients to close asynchronously */
    list *clients_pending_write; /* There is to write or install handler. */
    list *clients_pending_read;  /* Client has pending read socket buffers. */
    ...
}

5. 测试

8 核心,16G 内存, mac book 本地测试。

redis 服务默认开 4 线程,压测工具开 2 线程。有剩余核心处理机器的其它业务,这样不影响 redis 工作。

Linux 系统,如果安装不了 redis 最新版本,请升级系统 gcc 版本。

# redis.conf

io-threads 4
io-threads-do-reads yes

命令逻辑已整理成脚本,放到 github,顺手录制了测试视频:压力测试 redis 多线程处理网络 I/O

# 压测工具会模拟多个终端,防止超出限制,被停止。
ulimit -n 16384

# 可以设置对应的链接数/包体大小进行测试。
./redis-benchmark -c xxxx -r 1000000 -n 100000 -t set,get -q --threads 2  -d yyyy

在 mac book 上测试,从测试结果看,多线程没有单线程好。看到网上很多同学用压测工具测试,性能有很大的提升,有时间用其它机器跑下。可能是机器配置不一样,但是至少一点,这个多线程功能目前还有很大的优化空间,所以新特性,还需要放到真实环境中测试过,才能投产。

redis 压测过程

6. 总结


7. 参考

上一篇 下一篇

猜你喜欢

热点阅读