Redis源码学习之RDB
2019-05-15 本文已影响0人
lixin_karl
RDB持久化
一、RDB文件的创建与载入
Redis是一个内存服务器,如果关闭Redis进程,那么数据库的数据即消失不见,为了解决这个问题,Redis提供了RDB持久化功能,我们可以把数据库内存中的数据保存到RDB文件中。
保存到RDB文件可以手动输入BGSAVE或者SAVE命令执行,还可以根据数据库的配置定期执行。
其中,SAVE命令是阻塞SAVE,即执行此命令期间,数据库不处理任何其他客户端请求直到命令执行结束。而BGSAVE,系统会开启一个子进程专门用来生成RDB文件,父进程继续处理客户端命令。
而对于数据库配置来说,再redis.conf文件中配置如下的话,900s内数据库至少被修改了
save 900 20
save 200 10
20次,或者200s内数据库至少被修改了10次,就自动持久化到RDB文件。
由于AOF文件的更新频率是大于RDB的,如果开启的AOF功能,那么会优先使用AOF加载数据库。
二、根据源代码看RDB文件存储结构
RDB文件结构如下
REDIS | db_version | databases | checksum | |
---|---|---|---|---|
REDIS字符串 | 四字节 | 实际数据 | 结束标志 | 校验和 |
databases的结构
DB标志 | DBindex | db键个数标志 | db键个数 | db过期键个数 | 键值对 |
---|---|---|---|---|---|
SELECTDB | index | RESIZEDB | keynums | expirekeynums | k-v pairs |
k-v pairs的结构
过期时间标志 | 过期时间 | 值类型 | 键 | 值 |
---|---|---|---|---|
EXPIRETIME_MS | expire_time | 类型 | 字符串 | 值 |
下面根据源代码看databases存储格式
1. bgsaveCommand
void bgsaveCommand(client *c) {
int schedule = 0;
//如果有schedule选项,当AOF正在运行时,BGSAVE会等AOF完成
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
schedule = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);//就是更新选择的数据库id
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
} else if (server.aof_child_pid != -1) {//aof正在运行
if (schedule) {
server.rdb_bgsave_scheduled = 1;//
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"An AOF log rewriting in progress: can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {//实际处理BGSAVE函数
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
}
2. rdbSaveBackground
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;//子进程id
long long start;//开始时间
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;//aof保存正在运行或者rdb正在运行,不能再rdb了
server.dirty_before_bgsave = server.dirty;//bgsave之前的脏数据个数
server.lastbgsave_try = time(NULL);//最近一次bgsave开始
openChildInfoPipe();//打开父子进程通信的无名管道
start = ustime();//开始时间
if ((childpid = fork()) == 0) {//子进程中childid还是等于0
int retval;
closeListeningSockets(0);//子进程关闭复制过来的监听端口
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);//save操作实际调用函数
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);//成功退出
} else {//父进程
server.stat_fork_time = ustime()-start;//fork需要的时间
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {//如果fork失败了
closeChildInfoPipe();//关闭匿名通道
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;//设置child的pid
server.rdb_child_type = RDB_CHILD_TYPE_DISK;//写到rdb文件标志
updateDictResizePolicy();//如果没有rdb aof操作才可以resize
return C_OK;
}
return C_OK; /* unreached */
}
3. rdbSave
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /*当前存储错误信息的工作路径*/
FILE *fp;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());//先存储为tmp rdb文件
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
rioInitWithFile(&rdb,fp);//初始化rio(Redis IO)
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);//设置每次读取的字节数
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {//数据写入rdb文件
errno = error;
goto werr;
}
//保证缓存里面的数据全部写到fp中去了
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
//重命名rdb文件
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}
serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}
4. rdbSaveRio
以下代码就是真正的写入RDB的函数了,从以下代码中可以清晰的看出RDB文件开头为
REDIS db_version databases(DB标志 DBindex db键个数标志 db键个数 db过期键个数 保存键值对) 结束标志(EOF) 校验和。其中键值对保存方式见下一个函数。
//将数据库中的数据写入rdb文件的函数
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;//字典迭代器
dictEntry *de;//字典的一个实体信息
char magic[10];
int j;
uint64_t cksum;//校验和
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;//更新生成校验和的函数
//REDIS0009 (9个字节) 即REDIS db_version
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;//原始数据写到rdb文件中
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;//写入一些其他信息
for (j = 0; j < server.dbnum; j++) {//遍历每个数据库 即databases的开始
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);//更新迭代器
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//选择的db标志
if (rdbSaveLen(rdb,j) == -1) goto werr;//第几个db
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//db键个数标志
if (rdbSaveLen(rdb,db_size) == -1) goto werr;//db键个数
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//过期键的键个数
/* 遍历 DB的key-val*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;//保存键值对和过期时间
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
}
//保存lua脚本的信息
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;//写入结束标志
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;//checksum 写入校验和
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
5. rdbSaveKeyValuePair 保存键值对
上一小节中的保存键值对结构为:(过期时间标志 过期时间 值类型信息 写入键 写入值 )。
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;//最近常用
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;//最不常用
/* Save the expire time */
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;//设置过期时间标志
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;//写入过期时间
}
/* 保存LRU信息. */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
/*保存LFU信息*/
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
if (rdbSaveObjectType(rdb,val) == -1) return -1;//先保存值得类型
if (rdbSaveStringObject(rdb,key) == -1) return -1;//再保存字符串key
if (rdbSaveObject(rdb,val) == -1) return -1;//再保存值
return 1;
}
三、od查看RDB文件
od -c dump.rdb
里面只有一个 set karl hlx 然后执行save命令后的rdb文件。
捕获.PNG
结构 | 上图中对应部分 |
---|---|
REDIS | REDIS |
db_version | 0006 |
376 \0 | SELECT 0(选择0号数据库) |
\0 | 值类型RDB_TYPE_STRING |
004 | 键长 |
karl | 键 |
003 | 值长 |
值 | |
377 | EOF(结束标志) |
剩余的 | 校验和 |
四、参考
-
《Redis设计与实现》