redis源码阅读

2020-07-24  本文已影响0人  147258_d8b2

redis源码阅读


1.redis集群

redis集群储存数据使用哈希槽,类似于hash环。将key通过crc16编码映射为0-16384其中的一个值。并将该key储存到相应的服务器上
集群储存示意图
 redis 客户端命令这里不详细解释

客户端API特征

2.客户端hiredis-vip

hiredis-vip代码建立在原有的hiredis之上,其实现了对redis集群的访问
hiredis-vip的源码比hiredis多了下面文件(只带.c):
    * hircluster.c     ----redis集群
    * adlist.c         ----C模仿list结构
    * command.c        ----传输命令
    * crc16.c          ----crc16编码
    * hiarray.c        ----C模仿array结构
    * hiutil.c         ----工具
只介绍hircluster.c和command.c,其他都是工具。
2.1.1结构体

结构体cluster_node:服务器节点信息

    typedef struct cluster_node  
    {
        sds name;
        sds addr;
        sds host;
        int port;
        uint8_t role;     /* 主服务 OR 备用服务 */
        uint8_t myself; 
        redisContext *con;
        redisAsyncContext *acon;
        struct hilist *slots;
        struct hilist *slaves;
        int failure_count;
        void *data;   
        struct hiarray *migrating; 
        struct hiarray *importing;  
    }cluster_node;

结构体redisClusterContext:与redis集群连接的上下文

    typedef struct redisClusterContext {
        int err; 
        char errstr[128]; 
        sds ip;
        int port;
        int flags;
        enum redisConnectionType connection_type;
        struct timeval *connect_timeout;
        struct timeval *timeout;
        struct hiarray *slots;
        struct dict *nodes;
        cluster_node *table[REDIS_CLUSTER_SLOTS];
        uint64_t route_version;
        int max_redirect_count;
        int retry_count;
        struct hilist *requests;
        int need_update_route;
        int64_t update_route_time;
    } redisClusterContext;

结构体redisClusterAsyncContext:与redis集群异步连接

    typedef struct redisClusterAsyncContext {
        redisClusterContext *cc;
        int err;
        char errstr[128];
        void *data;
        void *adapter;
        adapterAttachFn *attach_fn;
        redisDisconnectCallback *onDisconnect;
        redisConnectCallback *onConnect;
    } redisClusterAsyncContext;

结构体之间的关系


结构体关系图
2.1.2 集群请求

实现hiredis-vip的redis集群请求分为三步

  1. 集群结构体初始化
/*初始化结构体*/
redisClusterContext *redisClusterContextInit(void);
/*初始化结构体(调用redisClusterContextInit),添加集群节点(调用redisClusterSetOptionAddNodes),更新集群节点hash槽信息(调用cluster_update_route)*/
redisClusterContext *redisClusterConnect(const char *addrs, int flags);
redisClusterContext *redisClusterConnectWithTimeout(const char *addrs, 
const struct timeval tv, int flags);
redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags);

  1. 集群请求的相关配置
int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr);//填充redisClusterContext的nodes
int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs);//填充redisClusterContext的nodes
int redisClusterSetOptionConnectBlock(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionParseSlaves(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的connect_timeout
int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的timeout
int redisClusterSetOptionMaxRedirect(redisClusterContext *cc,  int max_redirect_count);//填充redisClusterContext的max_redirect_count
  1. 建立和发送请求
    1. 格式化命令,如:SET foo bar1 转为 *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$4\r\nbar1\r\n。*3表示3个参数,$3表示该参数占3个字符,这也是最后客户端发送给服务端的消息
    2. 创建command结构体,并将格式化的命令填充该结构体。根据key使用crc16编码计算hash值。并通过hash值找到node。
    3. 根据node创建redisContext结构体,建立socket连接,并将连接放在redisContext结构体,将格式化的命令放入结构体的缓冲区。
    4. 发送并接收请求。
/*下面3个函数是格式化命令*/
void *redisClustervCommand(redisClusterContext *cc, const char *format, va_list ap);
void *redisClusterCommand(redisClusterContext *cc, const char *format, ...);
void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
/*创建command结构体,计算key的hash值,填充该结构体*/
void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len);
/*下面四个函数将命令放入该结构体的缓冲区。*/
int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd, int len);
int redisClustervAppendCommand(redisClusterContext *cc, const char *format, va_list ap);
int redisClusterAppendCommand(redisClusterContext *cc, const char *format, ...);
int redisClusterAppendCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
/*根据node创建上下文也就是redisContext结构体。再此函数中建立socket连接,并将连接放在redisContext结构体*/
redisContext *ctx_get_by_node(redisClusterContext *cc, struct cluster_node *node);
/*执行命令,调用上面的ctx_get_by_node函数*/
int redisGetReply(redisContext *c, void **reply);
/*
 * 更新集群的node配置。
 */
int cluster_update_route(redisClusterContext *cc);
struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags);
struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags);

2.1.2 异步接口

异步接口结构体

typedef struct redisClusterAsyncContext {
    redisClusterContext *cc;
    int err;
    char errstr[128]; 
    void *data;
    void *adapter;
    adapterAttachFn *attach_fn;
    redisDisconnectCallback *onDisconnect;
    redisConnectCallback *onConnect;
} redisClusterAsyncContext;

异步方法

/*初始化redis集群结构体redisClusterContext(调用redisClusterConnectNonBlock),初始化异步结构体*/
redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs, int flags);
/*设置回掉连接和断开连接的函数*/
int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn);
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn);
/*下面四个函数将命令放入该结构体的缓冲区。调用actx_get_by_node来获取redisContext结构体*/
int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, char *cmd, int len);
int redisClustervAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, va_list ap);
int redisClusterAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, ...);
int redisClusterAsyncCommandArgv(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
/*断开连接*/
void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc);
void redisClusterAsyncFree(redisClusterAsyncContext *acc);
/*异步根据node获取redisAsyncContext上下文*/
redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node);

3.hiredis


4.redis

ae事件处理

redis实现了自己的事件库,这个事件库被实现在ae.c
理解redis事件库怎样工作的最好方式是明白redis如何使用它。

Event Loop 初始化

initServer 方法定义在 redis.c ,该方法用于初始化全局变量serverredisServer 类型结构体)。server变量中一个最重要的变量是el (aeEventLoop 类型结构体)。<--!server.el包含了ae事件!>

initServer 方法通过调用 aeCreateEventLoop 方法初始化server.elaeEventLoop结构体定义如下:

typedef struct aeEventLoop
{
    int maxfd;/*最大句柄*/
    long long timeEventNextId;/*定时事件的个数,*/
    aeFileEvent events[AE_SETSIZE]; /*已经注册的句柄事件*/
    aeFiredEvent fired[AE_SETSIZE]; /*已经触发的句柄事件*/
    aeTimeEvent *timeEventHead; /*定时事件链表*/
    int stop;/*循环是否结束*/
    void *apidata; 
    aeBeforeSleepProc *beforesleep;/*指针函数,每次sleep事件前调用的方法*/
    aeBeforeSleepProc *aftersleep;/*指针函数,每次sleep事件后调用的方法*/
} aeEventLoop;
/*注:1.aftersleep在原文中没有,但是在redis5代码中出现。其中sleep指阻塞或者等待,每次循环时都会等待一段时间(该时间由触发最短时间事件的时间决定)。下文会介绍。
2.redis5支持的IO复用:epoll,select,evport,kqueue。本文中以epoll为准
3.apidata储存epoll句柄和epoll_event结构体数组,epoll_
*/

aeCreateEventLoop

aeCreateEventLoop 首先分配内存给 aeEventLoop 结构体,然后调用 ae_epoll.c:aeApiCreateae_epoll.c:aeApiCreate会使用合适的IO复用接口

aeApiCreate 分配内存给 aeApiStateaeApiState 有两个变量:epfd用于储存 epoll_create 返回的 epoll句柄, events数组变量(struct epoll_event类型 )。event的索引为socket句柄

aeCreateTimeEvent

aeCreateTimeEvent 接受如下参数:

aeCreateTimeEvent(server.el /*eventLoop*/, 1 /*milliseconds*/, serverCron /*proc*/, NULL /*clientData*/, NULL /*finalizerProc*/);

redis.c:serverCron 里面包含了多个定时事件。

aeCreateFileEvent

The essence of aeCreateFileEvent function is to execute epoll_ctl system call which adds a watch for EPOLLIN event on the listening descriptor create by anetTcpServer and associate it with the epoll descriptor created by a call to aeCreateEventLoop

aeCreateFileEvent函数的本质是执行epoll_ctl系统调用。aeCreateFileEvent方法使用epoll监控socket句柄,并为其配置mask表示socket句柄的读写方式。

initServer将以下参数传递给aeCreateFileEvent

这样就完成了Redis事件循环的初始化。

Event Loop Processing

main 将会调用aeMain 进行事件的循环。aeMain的循环在主进程,所以想要移植ae事件的同学注意aeMain一定要在代码的最后。

aeMain 使用while循环调用 ae.c:aeProcessEvents 。在while会不停的判断是否有aftersleep函数,如果有就执行。ae.c:aeProcessEvents 循环同时处理句柄事件和定时事件。

aeProcessEvents

  1. ae.c:aeProcessEvents通过aeSearchNearestTimer找到最近执行的定时事件。并获取该时间间隔。
  2. 将最短时间间隔赋值给tvp。如果最短时间间隔为负,将tvp设置为0,如redis中第一次的定时时间为1毫秒。
  3. 调用aeApiPoll函数。aeApiPoll函数实际调用 epoll_wait来sleep,sleep时间为tvp。之后aeApiPoll函数将有响应的socket句柄放入eventLoop->fired
  4. 查看是否有aftersleep函数,有则执行。
  5. 查看eventLoop->fired中是否有需要处理的句柄,有则处理
  6. 执行processTimeEventsprocessTimeEvents用于处理定时事件
  7. processTimeEvents中遍历链表执行定时任务。如果定时任务被贴上删除标记则删除。

eventLoop->fired 数组元素内容:

我们以客户端请求连接来说明那些函数被调用了。

  1. 客户端向服务器请求一个连接。
  2. aeApiPoll将会监测到服务器句柄触发可读。将服务器句柄放入 eventLoop->fired
  3. ae事件检测到eventLoop->fired有触发,调用acceptHandler
  4. acceptHandler 实际执行 accept ,获得客户端句柄。调用aeCreateFileEvent将客户端句柄放入ae事件中。如下
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
    readQueryFromClient, c) == AE_ERR) {
    freeClient(c);
    return NULL;
}

processTimeEvents

ae.processTimeEvents 遍历链表eventLoop->timeEventHead。如果达到时间就执行定时事件。每次执行一次完定时事件,都会用 ae.c:aeAddMilliSeconds重新设置下一次执行时间。在redis中,所有的定时事件都在serverCron中。通过配置文件hz 10设置频率。默认1000/10秒执行一次。

持久化

rdb的优势

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //省略...
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren())
    {
        //省略...
    } else {
        for (j = 0; j < server.saveparamslen; j++) {
            //轮流判断save规则,符合规则就执行持久化。规则见redis.conf
            struct saveparam *sp = server.saveparams+j;
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
        //省略...
    }
    //省略...
    return 1000/server.hz;
}
//转到rdbSaveBackground函数
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
     //省略...
    openChildInfoPipe();
    //fork子进程进行持久化
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        //省略...
    }
    //省略...
    return C_OK; 
}
//转到rdbSave
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    //省略...
    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }
    //省略...
}
//转到rdbSaveRio,这里执行内存持久化
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    //添加文件头部
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    //添加其他变量,创建时间,内存占用,bit值
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    //循环数据库
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        //储存TYPE为数据库编号,表示后面一个字节指数据库号
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        //储存数据库号
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        //储存数据库的一些信息
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        /* 循环字典 */
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            //储存key value值
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            //....
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }
    //脚本缓存
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }
    //结束字符
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
    //校验位,crc16编码
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}
//转到rdbSaveKeyValuePair
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    //储存过期时间
    if (expiretime != -1) {
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    //储存lru信息,https://www.cnblogs.com/hapjin/archive/2019/06/07/10933405.html
    if (savelru) {
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    //储存lfu信息
    if (savelfu) {
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        /* We can encode this in exactly two bytes: the opcode and an 8
         * bit counter, since the frequency is logarithmic with a 0-255 range.
         * Note that we do not store the halving time because to reset it
         * a single time when loading does not affect the frequency much. */
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }
    //储存类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    //储存key,value。格式为[len][date]。当value是"100","-17"这中能被编码成整形时,就储存整形形式
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    return 1;
}
//savekey相当于save string。
//转到rdbSaveObject,储存内容。
//最后调用write持久化。
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
    ssize_t n = 0, nwritten = 0;
    if (o->type == OBJ_STRING) {
        /* Save a string value */
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
        nwritten += n;
    } else if (o->type == OBJ_LIST) {
        /* Save a list value */
        if (o->encoding == OBJ_ENCODING_QUICKLIST) {
            quicklist *ql = o->ptr;
            quicklistNode *node = ql->head;

            if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
            nwritten += n;

            while(node) {
                if (quicklistNodeIsCompressed(node)) {
                    void *data;
                    size_t compress_len = quicklistGetLzf(node, &data);
                    if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
                    nwritten += n;
                } else {
                    if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
                    nwritten += n;
                }
                node = node->next;
            }
        } else {
            serverPanic("Unknown list encoding");
        }
    } else if (o->type == OBJ_SET) {
        /* Save a set value */
        if (o->encoding == OBJ_ENCODING_HT) {
            dict *set = o->ptr;
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
                dictReleaseIterator(di);
                return -1;
            }
            nwritten += n;

            while((de = dictNext(di)) != NULL) {
                sds ele = dictGetKey(de);
                if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
                    == -1)
                {
                    dictReleaseIterator(di);
                    return -1;
                }
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else if (o->encoding == OBJ_ENCODING_INTSET) {
            size_t l = intsetBlobLen((intset*)o->ptr);

            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else {
            serverPanic("Unknown set encoding");
        }
    } else if (o->type == OBJ_ZSET) {
        /* Save a sorted set value */
        if (o->encoding == OBJ_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);

            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
            zset *zs = o->ptr;
            zskiplist *zsl = zs->zsl;

            if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
            nwritten += n;

            /* We save the skiplist elements from the greatest to the smallest
             * (that's trivial since the elements are already ordered in the
             * skiplist): this improves the load process, since the next loaded
             * element will always be the smaller, so adding to the skiplist
             * will always immediately stop at the head, making the insertion
             * O(1) instead of O(log(N)). */
            zskiplistNode *zn = zsl->tail;
            while (zn != NULL) {
                if ((n = rdbSaveRawString(rdb,
                    (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
                {
                    return -1;
                }
                nwritten += n;
                if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
                    return -1;
                nwritten += n;
                zn = zn->backward;
            }
        } else {
            serverPanic("Unknown sorted set encoding");
        }
    } else if (o->type == OBJ_HASH) {
        /* Save a hash value */
        if (o->encoding == OBJ_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);

            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;

        } else if (o->encoding == OBJ_ENCODING_HT) {
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;

            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
                dictReleaseIterator(di);
                return -1;
            }
            nwritten += n;

            while((de = dictNext(di)) != NULL) {
                sds field = dictGetKey(de);
                sds value = dictGetVal(de);

                if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
                        sdslen(field))) == -1)
                {
                    dictReleaseIterator(di);
                    return -1;
                }
                nwritten += n;
                if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
                        sdslen(value))) == -1)
                {
                    dictReleaseIterator(di);
                    return -1;
                }
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else {
            serverPanic("Unknown hash encoding");
        }
    } else if (o->type == OBJ_STREAM) {
        /* Store how many listpacks we have inside the radix tree. */
        stream *s = o->ptr;
        rax *rax = s->rax;
        if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
        nwritten += n;

        /* Serialize all the listpacks inside the radix tree as they are,
         * when loading back, we'll use the first entry of each listpack
         * to insert it back into the radix tree. */
        raxIterator ri;
        raxStart(&ri,rax);
        raxSeek(&ri,"^",NULL,0);
        while (raxNext(&ri)) {
            unsigned char *lp = ri.data;
            size_t lp_bytes = lpBytes(lp);
            if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
            nwritten += n;
            if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
            nwritten += n;
        }
        raxStop(&ri);

        /* Save the number of elements inside the stream. We cannot obtain
         * this easily later, since our macro nodes should be checked for
         * number of items: not a great CPU / space tradeoff. */
        if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
        nwritten += n;
        /* Save the last entry ID. */
        if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
        nwritten += n;
        if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
        nwritten += n;

        /* The consumer groups and their clients are part of the stream
         * type, so serialize every consumer group. */

        /* Save the number of groups. */
        size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
        if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
        nwritten += n;

        if (num_cgroups) {
            /* Serialize each consumer group. */
            raxStart(&ri,s->cgroups);
            raxSeek(&ri,"^",NULL,0);
            while(raxNext(&ri)) {
                streamCG *cg = ri.data;

                /* Save the group name. */
                if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
                    return -1;
                nwritten += n;

                /* Last ID. */
                if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
                nwritten += n;
                if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
                nwritten += n;

                /* Save the global PEL. */
                if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
                nwritten += n;

                /* Save the consumers of this group. */
                if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
                nwritten += n;
            }
            raxStop(&ri);
        }
    } else if (o->type == OBJ_MODULE) {
        /* Save a module-specific value. */
        RedisModuleIO io;
        moduleValue *mv = o->ptr;
        moduleType *mt = mv->type;
        moduleInitIOContext(io,mt,rdb,key);

        /* Write the "module" identifier as prefix, so that we'll be able
         * to call the right module during loading. */
        int retval = rdbSaveLen(rdb,mt->id);
        if (retval == -1) return -1;
        io.bytes += retval;

        /* Then write the module-specific representation + EOF marker. */
        mt->rdb_save(&io,mv->value);
        retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
        if (retval == -1) return -1;
        io.bytes += retval;

        if (io.ctx) {
            moduleFreeContext(io.ctx);
            zfree(io.ctx);
        }
        return io.error ? -1 : (ssize_t)io.bytes;
    } else {
        serverPanic("Unknown object type");
    }
    return nwritten;
}
//优化处理1.提前分配好rio内存空间

集群

数据结构

哨兵模式

模块

上一篇 下一篇

猜你喜欢

热点阅读