14 Redis 6.0多IO线程的效率提高了吗

2023-10-12  本文已影响0人  滔滔逐浪

多 IO 线程的初始化
在InitServerLast 函数后,新增了initThreadedIO 函数。
initThreadedIO 函数的主要执行流程:

首先,initThreadedIO 函数会设置 IO 线程的激活标志。initThreadedIO 函数会把 io_threads_active 初始化为 0,表示 IO 线程还没有被激活。

initThreadedIO 函数会对设置的 IO 线程数量进行判断。这个数量就是保存在全局变量 server 的成员变量 io_threads_num 中的。

如果 IO 线程数量为 1,就表示只有 1 个主 IO 线程,initThreadedIO 函数就直接返回了。
如果 IO 线程数量大于宏定义 IO_THREADS_MAX_NUM(默认值为 128),那么 initThreadedIO 函数会报错,并退出整个程序。
如果 IO 线程数量大于 1,并且小于宏定义 IO_THREADS_MAX_NUM,那么,initThreadedIO 函数会执行一个循环流程,该流程的循环次数就是设置的 IO 线程数量。

在该循环流程中,initThreadedIO 函数就会给以下四个数组进行初始化操作。

io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表;
io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
io_threads_mutex 数组:保存线程互斥锁;
io_threads 数组:保存每个 IO 线程的描述符。

在对这些数组进行初始化的同时,initThreadedIO 函数还会根据 IO 线程数量,调用 pthread_create 函数创建相应数量的线程。对于 initThreadedIO 函数来说,它创建的线程要运行的函数是 IOThreadMain,参数是当前创建线程的编号。不过要注意的是,这个编号是从 1 开始的,编号为 0 的线程其实是运行 Redis server 主流程的主 IO 线程。

IO 线程的运行函数 IOThreadMain
IOThreadMain 函数也是在 networking.c 文件中定义的,它的主要执行逻辑是一个 while(1) 循环。在这个循环中,IOThreadMain 函数会把 io_threads_list 数组中,每个 IO 线程对应的列表读取出来。
IOThreadMain 函数会从每个 IO 线程对应的列表中,进一步取出要处理的客户端,然后判断线程要执行的操作标记。这个操作标记是用变量 io_threads_op 表示的,它有两种取值:

IO_THREADS_OP_WRITE:这表明该 IO 线程要做的是写操作,线程会调用 writeToClient 函数将数据写回客户端。
IO_THREADS_OP_READ:这表明该 IO 线程要做的是读操作,线程会调用 readQueryFromClient 函数从客户端读取数据。

Redis server 在接收到客户端请求和给客户端返回数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。然后,Redis server 在每次进入事件循环前,会再把列表中的客户端添加到 io_threads_list 数组中,交给 IO 线程进行处理。
推迟客户端读操作
Redis server 在和一个客户端建立连接后,就会开始监听这个客户端上的可读事件,而处理可读事件的回调函数是 readQueryFromClient。这个函数一开始会先从传入参数 conn 中获取客户端 c,紧接着就调用 postponeClientRead 函数,来判断是否推迟从客户端读取数据。
scss复制代码void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn); //从连接数据结构中获取客户
...
if (postponeClientRead(c)) return; //判断是否推迟从客户端读取数据
...
}

postponeClientRead 这个函数会根据四个条件判断能否推迟从客户端读取数据。
条件一:全局变量 server 的 io_threads_active 值为 1。这表示多 IO 线程已经激活。
条件二:全局变量 server 的 io_threads_do_read 值为 1。这表示多 IO 线程可以用于处理延后执行的客户端读操作。这个变量值是在 Redis 配置文件 redis.conf 中,通过配置项 io-threads-do-reads 设置的,默认值为 no,需要手动开启。
条件三:ProcessingEventsWhileBlocked 变量值为 0。这表示 processEventsWhileBlokced 函数没有在执行。ProcessingEventsWhileBlocked 是一个全局变量,它会在 processEventsWhileBlokced 函数执行时被设置为 1,在 processEventsWhileBlokced 函数执行完成时被设置为 0。
当 Redis 在读取 RDB 文件或是 AOF 文件时,这个函数会被调用,用来处理事件驱动框架捕获到的事件。所以,当 processEventsWhileBlokced 函数执行处理客户端可读事件时,这些客户端读操作是不会被推迟执行的。
条件四:客户端现有标识不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ。
CLIENT_MASTER 和 CLIENT_SLAVE 标识分别表示客户端是用于主从复制的客户端,也就是说,这些客户端不会推迟读操作。CLIENT_PENDING_READ 本身就表示一个客户端已经被设置为推迟读操作了,所以,对于已带有 CLIENT_PENDING_READ 标识的客户端,postponeClientRead 函数就不会再推迟它的读操作了。
总之,只有前面这四个条件都满足了,postponeClientRead 函数才会推迟当前客户端的读操作。postponeClientRead 函数会给该客户端设置 CLIENT_PENDING_REA 标识,并调用 listAddNodeHead 函数,把这个客户端添加到全局变量 server 的 clients_pending_read 列表中。
如何推迟客户端写操作
Redis 在执行了客户端命令,要给客户端返回结果时,会调用 addReply 函数将待返回结果写入客户端输出缓冲区。
在 addReply 函数的一开始,该函数会调用 prepareClientToWrite 函数,来判断是否推迟执行客户端写操作。prepareClientToWrite 就会调用 clientInstallWriteHandler 函数,再进一步判断能否推迟该客户端写操作。
clientInstallWriteHandler这个函数会判断两个条件:条件一:客户端没有设置过 CLIENT_PENDING_WRITE 标识,即没有被推迟过执行写操作。条件二:客户端所在实例没有进行主从复制,或者客户端所在实例是主从复制中的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求。
如何把待读客户端分配给 IO 线程执行?
handleClientsWithPendingReadsUsingThreads 函数:该函数主要负责将 clients_pending_read 列表中的客户端分配给 IO 线程进行处理。
handleClientsWithPendingReadsUsingThreads 函数的主要执行逻辑可以分成四步。
第一步,该函数会先根据全局变量 server 的 io_threads_active 成员变量,判定 IO 线程是否激活,并且根据 server 的 io_threads_do_reads 成员变量,判定用户是否设置了 Redis 可以用 IO 线程处理待读客户端。只有在 IO 线程激活,并且 IO 线程可以用于处理待读客户端时,handleClientsWithPendingReadsUsingThreads 函数才会继续执行,否则该函数就直接结束返回了。
第二步,handleClientsWithPendingReadsUsingThreads 函数会获取 clients_pending_read 列表的长度,这代表了要处理的待读客户端个数。然后,该函数会从 clients_pending_read 列表中逐一取出待处理的客户端,并用客户端在列表中的序号,对 IO 线程数量进行取模运算。
这样一来,我们就可以根据取模得到的余数,把该客户端分配给对应的 IO 线程进行处理。紧接着,handleClientsWithPendingReadsUsingThreads 函数会调用 listAddNodeTail 函数,把分配好的客户端添加到 io_threads_list 列表的相应元素中
第三步,handleClientsWithPendingReadsUsingThreads 函数会将 io_threads_list 数组 0 号列表(也就是 io_threads_list[0]元素)中的待读客户端逐一取出来,并调用 readQueryFromClient 函数进行处理。
紧接着,handleClientsWithPendingReadsUsingThreads 函数会执行一个 while(1) 循环,等待所有 IO 线程完成待读客户端的处理。
第四步,handleClientsWithPendingReadsUsingThreads 函数会再次遍历一遍 clients_pending_read 列表,依次取出其中的客户端。紧接着,它会判断客户端的标识中是否有 CLIENT_PENDING_COMMAND。如果有 CLIENT_PENDING_COMMAND 标识,表明该客户端中的命令已经被某一个 IO 线程解析过,已经可以被执行了。
此时,handleClientsWithPendingReadsUsingThreads 函数会调用 processCommandAndResetClient 函数执行命令。最后,它会直接调用 processInputBuffer 函数解析客户端中所有命令并执行。
如何把待写客户端分配给 IO 线程执行?
和待读客户端的分配处理类似,待写客户端分配处理是由 handleClientsWithPendingWritesUsingThreads 函数来完成的。该函数也是在 beforeSleep 函数中被调用的。
handleClientsWithPendingWritesUsingThreads 函数的主要流程同样也可以分成 4 步,其中,第 2、3 和 4 步的执行逻辑,和 handleClientsWithPendingReadsUsingThreads 函数类似。
在第 2 步,handleClientsWithPendingWritesUsingThreads 函数会把待写客户端,按照轮询方式分配给 IO 线程,添加到 io_threads_list 数组各元素中。
在第 3 步,handleClientsWithPendingWritesUsingThreads 函数会让主 IO 线程处理其待写客户端,并执行 while(1) 循环等待所有 IO 线程完成处理。
在第 4 步,handleClientsWithPendingWritesUsingThreads 函数会再次检查 clients_pending_write 列表中,是否还有待写的客户端。如果有的话,并且这些客户端还有留存在缓冲区中的数据,那么,handleClientsWithPendingWritesUsingThreads 函数就会调用 connSetWriteHandler 函数注册可写事件,而这个可写事件对应的回调函数是 sendReplyToClient 函数。

上一篇 下一篇

猜你喜欢

热点阅读