Redis主体流程分析

2019-10-22  本文已影响0人  达微

网上分析Redis源码的文章挺多,如黄健宏的《Redis设计与实现》就很详尽的分析了redis源码,很赞。前不久看到Paul Smith的较早年份的大作《Redis:under the hood》,受益匪浅,如此从整体上对redis原理有个大的把控,不过多纠结于细节,甚好。这里我用的版本是redis2.4.18版本,跟Paul Smith的版本有所不同,不过主体流程没有太多变化,这篇文章基本是对 《Redis:under the hood》的翻译和注解。

1 启动

让我们从 src/redis.c 中的main() 函数开始。

全局服务器状态初始化

首先, initServerConfig() 函数被调用。这部分主要是初始化 server 变量,它是一个 struct redisServer类型,用于保存redis服务器全局状态。

// redis.h:388
struct redisServer {
    pthread_t mainthread;
    int arch_bits;
    int port;
    char *bindaddr;
    char *unixsocket;
    mode_t unixsocketperm;
    int ipfd;
    int sofd;
    redisDb *db; 
    list *clients; // 客户端列表
    dict *commands; // 命令字典,key为命令名如get,值为redisCommand类型。
    unsigned lruclock:22;        /* clock incrementing every minute, for LRU */
    unsigned lruclock_padding:10;
    ...
}

// redis.c:71
struct redisServer server;

redisServer结构体有很多成员,主要可以分为下面几种类型:

initServerConfig()的作用是设置redis server的默认配置。

设置redis命令表

上一节提到redis的命令存储在server.commands这个字典中,其中key为命令名字,value为 redisCommand 结构体,其定义如下:

// redis.c:73
struct redisCommand readonlyCommandTable[] = {
    {"get",getCommand,2,0,NULL,1,1,1},
    {"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0},
    {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
    {"strlen",strlenCommand,2,0,NULL,1,1,1},
    {"del",delCommand,-2,0,NULL,0,0,0},
    ...
}

// redis.c:561
typedef void redisCommandProc(redisClient *c);

// redis.c:563
struct redisCommand {
    char *name;  // 命令名,如get
    redisCommandProc *proc; // 命令对应函数,如getCommand
    int arity; // 参数个数,如2
    int flags; // 标记,如set命令为REDIS_CMD_DENYOOM,表示在内存不够时不再处理set命令。
    redisVmPreloadProc *vm_preload_proc; 
    int vm_firstkey; /* The first argument that's a key (0 = no keys) */
    int vm_lastkey;  /* THe last argument that's a key */
    int vm_keystep;  /* The step between first and last key */
};

其中 readonlyCommandTable数组就是命令集合,redisCommand各字段分别是命名名,命令函数,参数个数,oom标记以及vm相关参数。

解析配置文件并更新配置

接下来会判断启动参数个数,如果参数个数为2:

解析配置文件是loadServerConfig()函数完成的,通过fgets一行行读取redis配置文件并更新服务器的配置。在这个函数里面可以看到若干的if else语句,一些所谓的编程书籍不提倡这样,包括goto使用等,然而大师级的程序员并不在意这些细节,所谓编程无定法,境界高就是可以为所欲为的。

从代码中可以发现,redis配置也可以不指定配置文件,而是在标准输入指定,运行src/redis-server -,然后在命令行输入配置即可。

开启daemon

如果配置了以守护进程运行,则会调用daemonize()函数通过fork()创建子进程然后子进程调用setsid()创建一个新的会话,并将标准输入输出,错误输出冲形象到/dev/null

有些书上会写运行守护进程要fork()两次,其实通过setsid()创建了新的会话的话,没有必要fork()两次,redis就是这么做的。

如果以守护进程运行,后面还需调用createPidFile()创建pid文件,默认路径是/var/run/redis.pid

初始化服务器

在上面步骤完成后,就调用initServer()函数来初始化服务器了。

信号设置

先是信号设置。忽略SIGHUP, SIGPIPE信号,然后通过setupSignalHandlers()设置TERM信号等处理函数为 sigtermHandler()等。

成员初始化

接着是初始化server成员变量,如客户端链表clients,slave链表slaves,这些列表都是双向链表struct list

创建事件循环对象

接着是调用aeCreateEventLoop()创建Event Loop 并赋值给 server.el变量。它的类型是 aeEventLoop,定义如下:

// ae.h:89
typedef struct aeEventLoop { 
    int maxfd;
    long long timeEventNextId;
    aeFileEvent events[AE_SETSIZE]; /* Registered events */
    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd; 
    int mask;
} aeFiredEvent;

aeEventLoop 包括时间事件链表头 timeEventHead ,文件事件数组 aeFileEvent数组,以及待处理的文件事件数组 aeFiredEvent(AE_SETSIZE为10240)。

可以看到文件事件结构体aeFileEvent中字段mask是标记,表示事件类型读/写。而rfileProc则是读取事件处理函数,而wfileProc是写事件处理函数。而待处理的文件事件 aeFiredEvent 则只包含了需要处理的文件描述符fd和它的读写标记mask。

而时间事件则是 aeTimeEvent 类型,存储的包括时间事件ID,时间事件执行时间(秒 when_sec 和 毫秒 when_ms),此外还有时间事件的处理函数 timeProc 等。这是一个单向链表结构,next指向下一个时间事件,时间事件和文件事件最后都是在redis服务器的大循环中处理的。

服务器监听

如果指定了端口,则会启动anetTcpServer并开始监听。监听端口默认为6379,配置文件可以指定绑定的ip和端口。对应文件描述符为ipfd。如果是设置的unixsocket,则启动anetUnixServer,对应文件描述符为sofd。

这里跟我们平时写WEB服务器程序基本一致,只是稍作了封装,流程也是通用的socket(),bind(),listen()。

int anetTcpServer(char *err, int port, char *bindaddr)
{
    int s;       
    struct sockaddr_in sa;

    if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
        return ANET_ERR;

    memset(&sa,0,sizeof(sa));
    sa.sin_family = AF_INET;
    sa.sin_port = htons(port); 
    sa.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
        anetSetError(err, "invalid bind address");
        close(s);
        return ANET_ERR;
    }    
    if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
        return ANET_ERR;
    return s;
}

数据库

initServer() 中还完成了数据库初始化。默认是16个db,对应类型为dbDictType,id为0-15。此外还要初始化过期键字典expires,阻塞键字典 blocking_keys,观察键字典 watched_keys等。

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict *io_keys;              /* Keys with clients waiting for VM I/O */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;
} redisDb;

redis的持久化分为两种方式,rdb和aof。其中rdb是使用二进制格式存储数据,包括键值类型和数据,过期时间等,saveparams里面指定存储条件,详细格式说明见 Redis-RDB-Format。aof则是按redis命令存储,重启后可以按照aof中的命令重放恢复数据。

注册时间事件

接着在server.el中注册时间事件。这个就是1毫秒后要开始执行的事件 serverCron(),主要是为了在服务器启动后马上运行它,后面该函数100毫秒执行一次。这个函数要做很多事情,主要包括:

注册文件事件

那之前我们创建了一个TcpServer而且已经开始监听,需要对客户端连接事件进行注册和处理。这在Linux上面是通过 epoll 来实现的。

aeCreateFileEvent()函数主要是设置aeFileEvent结构体的值,包括指定该文件事件是读还是写,根据读写事件指定对应的处理函数 rfileProc和 wfileProc。这里对tcp服务器指定的函数是 acceptTcpHandler()。最终都是通过 aeApiAddEvent() 函数使用 epoll_ctl() 将 tcp socket的fd注册到epoll中,客户端连接的命令处理都是在 acceptTcpHandler()中完成,这个函数后面分析。

// redis.c:980
aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL);

// ae.c:88
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

// ae_epoll.c:29
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {           
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD           
     * operation. Otherwise we need an ADD operation. */                       
    int op = eventLoop->events[fd].mask == AE_NONE ?                           
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;                                     

    ee.events = 0; 
    mask |= eventLoop->events[fd].mask; /* Merge old events */                 
    if (mask & AE_READABLE) ee.events |= EPOLLIN;                              
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;                             
    ee.data.u64 = 0; /* avoid valgrind warning */                              
    ee.data.fd = fd;                                                           
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;                     
    return 0;
}

其他

恢复数据

如果开启了AOF,则从AOF文件重放命令来恢复redis数据。否则,则是从RDB文件恢复数据。每次进行RDB持久化时,redis都是将内存中的数据库的数据全部写到文件中,不是增量的持久化。

if (server.appendonly) {
    if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
        redisLog(REDIS_NOTICE,"DB loaded from append only file: %ld seconds",time(NULL)-start);
} else {
    if (rdbLoad(server.dbfilename) == REDIS_OK) {
        redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",
            time(NULL)-start);
    } else if (errno != ENOENT) {
        redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
        exit(1);
    }    
}  

建立事件循环

接着,redis注册beforeSleep()函数到事件循环中,这个函数在每次进入事件循环时首先调用它,它主要做两件事:

进入事件循环

redis接着正式调用 aeMain() 函数进入事件循环。当有时间事件或者文件事件需要处理时,会调用他们对应的处理函数进行处理。aeProcessEvents()封装了处理函数,时间事件通过自定义的函数处理,而文件事件则通过epoll或者kqueue或者select系统调用来处理,在Linux里面通常使用的是epoll。

aeProcessEvents() 会优先处理文件事件,其次才是处理时间事件。文件事件就是通过 aeApiPoll() 函数来获取事件,并将触发的事件加入到 server.el.fired 数组中,最终就是调用epoll_wait()获取事件,其中超时时间设置的是距离最近一次时间事件的时间,这样如果没有文件事件也不会太耽误时间事件执行。获取到文件事件后,会根据事件类型是读还是写调用相应的方法处理。如读事件就是调用的 acceptTcpHandler() 处理的。

接着处理时间事件,从 server.el.timeEventHead 可以拿到时间事件链表的头,遍历该链表,如果有时间事件的执行时间到了,则执行对应的函数即可。这里有个地方注意下,如果时间事件处理函数返回值不是-1,则表示该时间事件需要定期执行,需要设置该事件下一次执行时间而不是从时间事件链表移除它,如serverCron这个时间事件,就是这样的定期执行事件,100ms执行一次。

2 请求处理和响应

现在服务器已经启动完毕了,接下来看看redis是如何在事件循环中接收客户端请求并处理请求的,这里以 TCP 方式为例分析,unix socket的类似。

接收连接

redis处理客户端请求是在函数 acceptTcpHandler() 完成的。这个函数通过 anetTcpAccept() 接收客户端请求,然后调用的 acceptCommandHandler() 来处理客户端请求。

在acceptCommandHandler最终调用的是 createClient(fd)函数创建了redisClient对象,初始化该对象的变量,并将该连接fd注册到事件循环中,事件类型为 AE_READABLE(EPOOLIN)。该事件处理函数为 readQueryFromClient(),用于处理客户端连接的命令。这样,我们之前注册了listenfd,用于在新连接到来时接收新连接,现在将客户端连接fd注册到事件循环,完成客户端命令处理。注意这里必须通过anetNonBlock(NULL, fd)将客户端连接fd设置为非阻塞的。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd; 
    char cip[128];
    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
    if (cfd == AE_ERR) {
        redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
        return;
    }    
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    acceptCommonHandler(cfd);
}

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    c->bufpos = 0;

    anetNonBlock(NULL,fd);
    anetTcpNoDelay(NULL,fd);
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
    {
        close(fd);
        zfree(c);
        return NULL;
    }
    ...
}

从客户端读取命令

当客户端发送命令时,会通过readQueryFromClient()处理。它每次读取最多REDIS_IOBUF_LEN(16*1024)16K字节到缓存数组buf中,最后将缓存的数据拷贝到 redisClient->querybuf 中,然后调用 processInputBuffer() 函数处理客户端命令。

processInputBuffer() 解析客户端的原始命令字符串并将命令参数设置到 redisClient->argv 数组中,命令参数是 redisObject 类型的结构体。注意命令类型有两种,我们通过 redis-cli 发送的命令类型为 REDIS_REQ_MULTIBULK,这种命令以*开头,符合 redis protocol,调用processMultibulkBuffer()函数处理。另外一种命令是 REDIS_REQ_INLINE,这种命令是你通过其他工具连接的时候发的,比如通过 telnet localhost 6379,这种命令是直接的原生字符串,没有使用 redis protocol封装客户端命令。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    char buf[REDIS_IOBUF_LEN];
    int nread;

    server.current_client = c;
    nread = read(fd, buf, REDIS_IOBUF_LEN);
    ...
    processInputBuffer(c);
    server.current_client = NULL;
}

void processInputBuffer(redisClient *c) {
    while(sdslen(c->querybuf)) {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                c->reqtype = REDIS_REQ_INLINE;
            }
        }

        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}

processInputBuffer()解析到完整命令后,便会调用 processCommand()函数开始处理客户端命令。先通过lookupCommand根据命令字符串找到命令的处理程序。在调用命令处理程序执行实际的命令前,会先执行一系列的检查:

执行命令

在上一节的检查OK后执行 call() 函数真正开始执行命令,它调用的是 redisCommand 的proc指向的函数,为 redisCommandProc 类型对象。命令执行完后,会通过 addReply() 函数将执行结果缓存到 redisClient 的 buf 数组中。

那这个响应数据什么时候会发送给客户端呢?这是因为在 addReply() 中会调用 _installWriteEvent(),该函数就是将客户端连接的fd加入到事件循环中,事件类型为AE_WRITABLE(EPOLLOUT),然后将响应数据通过_addReplyToBuffer()_addReplyObjectToList()写入到响应缓存redisClient->buf和redisClient->reply中。这里的buf和reply两个地方都是用于写响应缓存的,如果响应的总的数据长度(响应数据长度+数据本身)小于 REDIS_REPLY_CHUNK_BYTES(7500)字节,则用buf数组缓存数据,否则用reply链表来存储数据。

当下一个事件循环到来时,会读取到该客户端连接fd,然后通过函数 sendReplyToClient()从响应缓存读取数据并发送响应数据给客户端,然后移除写事件。命令执行完成后,redis会重置redisClient对象并接收后续命令。

void call(redisClient *c) {
    long long dirty, start = ustime(), duration;

    dirty = server.dirty;
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

    if (server.appendonly && dirty > 0) 
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}

写操作如SET/ZADD等会让redis服务器变的dirty,后台IO线程执行的BGSAVE很重要,它会根据时间和修改过的key的数目来刷数据到rdb中。而如果开启了AOF还需要刷新AOF文件到磁盘。feedAppendOnlyFile 是将客户端命令写入到AOF文件中,所以可以通过AOF文件重放来恢复数据。当然这里会对expire,setex等命令做些转换,将过期时间设置为绝对时间,添加必要的SELECT db的命令等。另外则是直接通过 catAppendOnlyGenericCommand() 将命令写入到AOF文件。

如果有slave连接到该服务器,则通过 replicationFeedSlaves() 将命令发给slave服务器(slave同步时master会先通过BGSAVE保存一份rdb并发送给slave,后续的命令同步则由replicationFeedSlaves()来完成)。如果有客户端通过 monitor 命令连接到该服务器,则还要通过 replicationFeedMonitors() 发送命令字符串过去,并带上命令时间戳。

3 总结

这篇文章主要对redis的初始化流程做了分析,包括启动时配置初始化,使用epoll支持高并发,读取解析和响应命令等,流程图如下(来自 Redis:under the hood 博客)。这里还有篇 Paul Smith的大作 More Redis internals: Tracing a GET & SET 用于跟踪 redis 的 GET/SET 命令流程的,值得一看。

image

4 参考资料

517909-20160120083937156-999049539.png
上一篇下一篇

猜你喜欢

热点阅读