Redis源码学习笔记

Redis源码学习之RDB

2019-05-15  本文已影响0人  lixin_karl

RDB持久化

一、RDB文件的创建与载入

   Redis是一个内存服务器,如果关闭Redis进程,那么数据库的数据即消失不见,为了解决这个问题,Redis提供了RDB持久化功能,我们可以把数据库内存中的数据保存到RDB文件中。

   保存到RDB文件可以手动输入BGSAVE或者SAVE命令执行,还可以根据数据库的配置定期执行。

   其中,SAVE命令是阻塞SAVE,即执行此命令期间,数据库不处理任何其他客户端请求直到命令执行结束。而BGSAVE,系统会开启一个子进程专门用来生成RDB文件,父进程继续处理客户端命令。

   而对于数据库配置来说,再redis.conf文件中配置如下的话,900s内数据库至少被修改了

save 900 20
save 200 10

20次,或者200s内数据库至少被修改了10次,就自动持久化到RDB文件。

   由于AOF文件的更新频率是大于RDB的,如果开启的AOF功能,那么会优先使用AOF加载数据库。

二、根据源代码看RDB文件存储结构

   RDB文件结构如下

REDIS db_version databases checksum
REDIS字符串 四字节 实际数据 结束标志 校验和

   databases的结构

DB标志 DBindex db键个数标志 db键个数 db过期键个数 键值对
SELECTDB index RESIZEDB keynums expirekeynums k-v pairs

   k-v pairs的结构

过期时间标志 过期时间 值类型
EXPIRETIME_MS expire_time 类型 字符串

下面根据源代码看databases存储格式

1. bgsaveCommand
void bgsaveCommand(client *c) {
    int schedule = 0;
    //如果有schedule选项,当AOF正在运行时,BGSAVE会等AOF完成
    if (c->argc > 1) {
        if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
            schedule = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
    rdbSaveInfo rsi, *rsiptr;
    rsiptr = rdbPopulateSaveInfo(&rsi);//就是更新选择的数据库id

    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {//aof正在运行
        if (schedule) {
            server.rdb_bgsave_scheduled = 1;//
            addReplyStatus(c,"Background saving scheduled");
        } else {
            addReplyError(c,
                "An AOF log rewriting in progress: can't BGSAVE right now. "
                "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
                "possible.");
        }
    } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {//实际处理BGSAVE函数
        addReplyStatus(c,"Background saving started");
    } else {
        addReply(c,shared.err);
    }
} 
2. rdbSaveBackground
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;//子进程id
    long long start;//开始时间

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;//aof保存正在运行或者rdb正在运行,不能再rdb了

    server.dirty_before_bgsave = server.dirty;//bgsave之前的脏数据个数
    server.lastbgsave_try = time(NULL);//最近一次bgsave开始
    openChildInfoPipe();//打开父子进程通信的无名管道
    start = ustime();//开始时间
    if ((childpid = fork()) == 0) {//子进程中childid还是等于0
        int retval;
        closeListeningSockets(0);//子进程关闭复制过来的监听端口
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);//save操作实际调用函数
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);
            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);//成功退出
    } else {//父进程
        server.stat_fork_time = ustime()-start;//fork需要的时间
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {//如果fork失败了
            closeChildInfoPipe();//关闭匿名通道
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;//设置child的pid
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;//写到rdb文件标志
        updateDictResizePolicy();//如果没有rdb aof操作才可以resize
        return C_OK;
    }
    return C_OK; /* unreached */
}
3. rdbSave
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /*当前存储错误信息的工作路径*/
    FILE *fp;
    rio rdb;
    int error = 0;
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());//先存储为tmp rdb文件
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }
    rioInitWithFile(&rdb,fp);//初始化rio(Redis IO)
    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);//设置每次读取的字节数

    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {//数据写入rdb文件
        errno = error;
        goto werr;
    }
    //保证缓存里面的数据全部写到fp中去了
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
    //重命名rdb文件
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;
werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}
4. rdbSaveRio

   以下代码就是真正的写入RDB的函数了,从以下代码中可以清晰的看出RDB文件开头为

REDIS db_version databases(DB标志 DBindex db键个数标志 db键个数 db过期键个数 保存键值对) 结束标志(EOF) 校验和。其中键值对保存方式见下一个函数。

//将数据库中的数据写入rdb文件的函数
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;//字典迭代器
    dictEntry *de;//字典的一个实体信息
    char magic[10];
    int j;
    uint64_t cksum;//校验和
    size_t processed = 0;
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;//更新生成校验和的函数
    //REDIS0009 (9个字节) 即REDIS db_version
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;//原始数据写到rdb文件中
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;//写入一些其他信息
    for (j = 0; j < server.dbnum; j++) {//遍历每个数据库 即databases的开始
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);//更新迭代器
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//选择的db标志
        if (rdbSaveLen(rdb,j) == -1) goto werr;//第几个db
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//db键个数标志
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;//db键个数
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//过期键的键个数
        /* 遍历 DB的key-val*/
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;//保存键值对和过期时间

            /* When this RDB is produced as part of an AOF rewrite, move
             * accumulated diff from parent to child while rewriting in
             * order to have a smaller final write. */
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            }
        }
        dictReleaseIterator(di);
        di = NULL;
    }
    //保存lua脚本的信息
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;//写入结束标志
    
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;//checksum 写入校验和
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}
5. rdbSaveKeyValuePair 保存键值对

   上一小节中的保存键值对结构为:(过期时间标志 过期时间 值类型信息 写入键 写入值 )。

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;//最近常用
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;//最不常用
    /* Save the expire time */
    if (expiretime != -1) {
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;//设置过期时间标志
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;//写入过期时间
    }

    /* 保存LRU信息. */
    if (savelru) {
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    /*保存LFU信息*/
    if (savelfu) {
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }
    
    if (rdbSaveObjectType(rdb,val) == -1) return -1;//先保存值得类型
    if (rdbSaveStringObject(rdb,key) == -1) return -1;//再保存字符串key
    if (rdbSaveObject(rdb,val) == -1) return -1;//再保存值
    return 1;
}
三、od查看RDB文件

​ od -c dump.rdb
里面只有一个 set karl hlx 然后执行save命令后的rdb文件。


捕获.PNG
结构 上图中对应部分
REDIS REDIS
db_version 0006
376 \0 SELECT 0(选择0号数据库)
\0 值类型RDB_TYPE_STRING
004 键长
karl
003 值长
377 EOF(结束标志)
剩余的 校验和
四、参考
上一篇下一篇

猜你喜欢

热点阅读