Redis源码研究之主从复制
本文主要说明Redis中主从服务器之间复制的实现。
建议阅读:
1、Redis主从服务器功能的理论部分见:Redis之主从服务器的复制
I、上帝视角
1、为完成主从复制操作,redisServer
结构体中维护了几个相关的数据:
/*src/redis.h/redisServer*/
struct redisServer {
......
/* Replication (master) */
// 最近一次使用(访问)的数据集
int slaveseldb; /* Last SELECTed DB in replication output */
// 全局复制偏移量(一个累积值)
long long master_repl_offset; /* Global replication offset */
// 主从连接心跳频率(发送ping的频率)
int repl_ping_slave_period; /* Master pings the slave every N seconds */
// 指向复制积压缓冲区backlog
char *repl_backlog; /* Replication backlog for partial syncs */
// backlog大小
long long repl_backlog_size; /* Backlog circular buffer size */
// backlog中写入的新数据的大小
long long repl_backlog_histlen; /* Backlog actual data length */
// backlog的当前索引
long long repl_backlog_idx; /* Backlog circular buffer current offset */
// backlog中可以被还原的第一个字节的偏移量
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
// 积压空间有效时间,即过期时间
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
// 距离上一次有从服务器的时间
time_t repl_no_slaves_since; /* We have no slaves since that time.
Only valid if server.slaves len is 0. */
// 是否开启最小数量从服务器写入功能
int repl_min_slaves_to_write; /* Min number of slaves to write. */
// 定义最小数量从服务器的最大延迟值
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
// 延迟良好的从服务器的数量
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
/* Replication (slave) */
// 主服务器的验证密码
char *masterauth; /* AUTH with this password with master */
// 主服务器的地址
char *masterhost; /* Hostname of master */
// 主服务器的端口
int masterport; /* Port of master */
// 超时时间
int repl_timeout; /* Timeout after N seconds of master idle */
// 主服务器所对应的客户端
redisClient *master; /* Client that is master for this slave */
// 被缓存的主服务器,PSYNC 时使用
redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
// 复制的状态(服务器是从服务器时使用)
int repl_state; /* Replication status if the instance is a slave */
// RDB 文件的大小
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
// 已读 RDB 文件内容的字节数
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
// 最近一次执行 fsync 时的偏移量
// 用于 sync_file_range 函数
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
// 主服务器的套接字
int repl_transfer_s; /* Slave -> Master SYNC socket */
// 保存 RDB 文件的临时文件的描述符
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
// 保存 RDB 文件的临时文件名字
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
// 最近一次读入 RDB 内容的时间
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
// 是否只读从服务器?
int repl_slave_ro; /* Slave is read only? */
// 连接断开的时长
time_t repl_down_since; /* Unix time at which link with master went down */
// 是否要在 SYNC 之后关闭 NODELAY ?
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
// 从服务器优先级
int slave_priority; /* Reported in INFO and used by Sentinel. */
// 本服务器(从服务器)当前主服务器的 RUN ID
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
// 初始化偏移量
long long repl_master_initial_offset; /* Master PSYNC offset. */
}
2、下面我们看关于上述代码中提到的复制积压缓冲区的内容,这是实现部分重同步的重要结构:
复制积压缓冲区可以看成是一个队列,当执行一个Redis命令时,积压缓冲区中的内容将会更新,在源码中调用顺序是这样的call()-->propagate()-->replicationFeedSlaves()
:
/*调用命令的实现函数,执行命令*/
/*src/redis.c/call*/
void call(redisClient *c, int flags) {
......
/* Call the command. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
// 脏数据标记,数据是否被修改
dirty = server.dirty;
// 执行命令对应的函数
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
......
// 将客户端请求的数据修改记录传播给AOF 和从机
/* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
// 强制主从复制
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
// 强制AOF 持久化
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
// 数据被修改
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
// 传播数据修改记录
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags); //命令传播
}
......
}
3、propagate()
函数有两个传播方向,一个是AOF持久化,一个是主从复制:
/*命令传播*/
/*src/redis.c/propagate*/
/*FLAG 可以是以下标识的 xor :
*
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* 不传播
*
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* 传播到 AOF
*
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
* 传播到 slave
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// 传播到 AOF
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 传播到 slave
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
4、关于AOF的内容我们已经在前面讲过,我们下面看replicationFeedSlaves
是如何实现的:
// 将传入的参数发送给从服务器
/*src/replication.c/replicationFeedSlaves*/
// 操作分为三步:
// 1) 构建协议内容
// 2) 将协议内容备份到 backlog
// 3) 将内容发送给各个从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
// backlog 为空,且没有从服务器,直接返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
// 如果有需要的话,发送 SELECT 命令,指定数据库
if (server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
// 将 SELECT 命令添加到 backlog
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
// 发送给所有从服务器
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
// 将命令写入到backlog
if (server.repl_backlog) {
char aux[REDIS_LONGSTR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
// 将参数从对象转换成协议格式
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
// 指向从服务器
redisClient *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
// 不要给正在等待 BGSAVE 开始的从服务器发送命令
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
// 向已经接收完和正在接收 RDB 文件的从服务器发送命令
// 如果从服务器正在接收主服务器发送的 RDB 文件,
// 那么在初次 SYNC 完成之前,主服务器发送的内容会被放进一个缓冲区里面
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
II、主从关系的建立
1、主从服务器的建立是通过从服务器发送SLAVEOF
命令完成的,从服务器会自动连接主机,注册响应的读写事件:
void slaveofCommand(redisClient *c) {
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node. */
// 不允许在集群模式中使用
if (server.cluster_enabled) {
addReplyError(c,"SLAVEOF not allowed in cluster mode.");
return;
}
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
// SLAVEOF NO ONE 让从服务器转为主服务器
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
// 让服务器取消复制,成为主服务器
replicationUnsetMaster();
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
}
} else {
long port;
// 获取端口参数
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
/* Check if we are already attached to the specified slave */
// 检查输入的 host 和 port 是否服务器目前的主服务器
// 如果是的话,向客户端返回 +OK ,不做其他动作
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}
/* There was no previous master or the user specified a different one,
* we can continue. */
// 没有前任主服务器,或者客户端指定了新的主服务器
// 开始执行复制操作
replicationSetMaster(c->argv[1]->ptr, port);
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}
2、同步的选取
当从服务器发起时首先会判断是否可以执行部分重同步,若不行则被迫执行全同步。
其判断及调用过程如下:
/*从服务器用于同步主服务器的回调函数*/
/*src/replication.c/syncWithMaster*/
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
// 如果处于 SLAVEOF NO ONE 模式,那么关闭 fd
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
/* Check for errors in the socket. */
// 检查套接字错误
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds (in the meantime the slave would block). */
// 如果状态为 CONNECTING ,那么在进行初次同步之前,
// 向主服务器发送一个非阻塞的 PONG
// 因为接下来的 RDB 文件发送非常耗时,所以我们想确认主服务器真的能访问
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
// 手动发送同步 PING ,暂时取消监听写事件
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
// 更新状态
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
// 同步发送 PING
syncWrite(fd,"PING\r\n",6,100);
// 返回,等待 PONG 到达
return;
}
/* Receive the PONG command. */
// 接收 PONG 命令
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read. */
// 手动同步接收 PONG ,暂时取消监听读事件
aeDeleteFileEvent(server.el,fd,AE_READABLE);
/* Read the reply with explicit timeout. */
// 尝试在指定时间限制内读取 PONG
buf[0] = '\0';
// 同步接收 PONG
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
// 接收到的数据只有两种可能:
// 第一种是 +PONG ,第二种是因为未验证而出现的 -NOAUTH 错误
if (buf[0] != '+' &&
strncmp(buf,"-NOAUTH",7) != 0 &&
strncmp(buf,"-ERR operation not permitted",28) != 0)
{
// 接收到未验证错误
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
// 接收到 PONG
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
}
/* AUTH with the master if required. */
// 进行身份验证
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
// 将从服务器的端口发送给主服务器,
// 使得主服务器的 INFO 命令可以显示从服务器正在监听的端口
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
// 根据返回的结果决定是执行部分 resync ,还是 full-resync
psync_result = slaveTryPartialResynchronization(fd);
// 可以执行部分 resync
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
// 返回
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
// 主服务器不支持 PSYNC ,发送 SYNC
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
// 向主服务器发送 SYNC 命令
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
// 如果执行到这里,
// 那么 psync_result == PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED
/* Prepare a suitable temp file for bulk transfer */
// 打开一个临时文件,用于写入和保存接下来从主服务器传来的 RDB 文件数据
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
goto error;
}
/* Setup the non blocking download of the bulk file. */
// 设置一个读事件处理器,来读取主服务器的 RDB 文件
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
// 设置状态
server.repl_state = REDIS_REPL_TRANSFER;
// 更新统计信息
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
3、syncWithMaster
主要通过调用slaveTryPartialResynchronization
来判断是否能进行部分重同步,否则进行全同步:
/*src/replication.c/slaveTryPartialResynchronization*/
/* Try a partial resynchronization with the master if we are about to reconnect.
*
* 在重连接之后,尝试进行部分重同步。
*
* If there is no cached master structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the master run id and the master replication
* global offset.
*
* 如果 master 缓存为空,那么通过 "PSYNC ? -1" 命令来触发一次 full resync ,
* 让主服务器的 run id 和复制偏移量可以传到附属节点里面。
*
* This function is designed to be called from syncWithMaster(), so the
* following assumptions are made:
*
* 这个函数由 syncWithMaster() 函数调用,它做了以下假设:
*
* 1) We pass the function an already connected socket "fd".
* 一个已连接套接字 fd 会被传入函数
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.master client structure.
* 函数不会关闭 fd 。
* 当部分同步成功时,函数会将 fd 用作 server.master 客户端结构中的
* 文件描述符。
*
* The function returns:
* 以下是函数的返回值:
*
* PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
* PSYNC 命令成功,可以继续。
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the master run_id and global replication
* offset is saved.
* 主服务器支持 PSYNC 功能,但目前情况需要执行 full resync 。
* 在这种情况下, run_id 和全局复制偏移量会被保存。
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* 主服务器不支持 PSYNC ,调用者应该下降到 SYNC 命令。
*/
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) {
// 缓存存在,尝试部分重同步
// 命令为 "PSYNC <master_run_id> <repl_offset>"
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else {
// 缓存不存在
// 发送 "PSYNC ? -1" ,要求完整重同步
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
// 向主服务器发送 PSYNC 命令
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
// 接收到 FULLRESYNC ,进行 full-resync
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
// 分析并记录主服务器的 run id
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
// 检查 run id 的合法性
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
// 主服务器支持 PSYNC ,但是却发来了异常的 run id
// 只好将 run id 设为 0 ,让下次 PSYNC 时失败
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
// 保存 run id
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
// 以及 initial offset
server.repl_master_initial_offset = strtoll(offset,NULL,10);
// 打印日志,这是一个 FULL resync
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
// 要开始完整重同步,缓存中的 master 已经没用了,清除它
replicationDiscardCachedMaster();
sdsfree(reply);
// 返回状态
return PSYNC_FULLRESYNC;
}
// 接收到 CONTINUE ,进行 partial resync
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
// 将缓存中的 master 设为当前 master
replicationResurrectCachedMaster(fd);
// 返回状态
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
// 接收到错误?
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
// 主服务器不支持 PSYNC
return PSYNC_NOT_SUPPORTED;
}
III、全同步
全同步发生在两种情况下:
1、从服务器第一次连接到主服务器;
2、从服务器断线之后,复制积压缓冲区中的可恢复内容已经落后与offset;
syncCommand()
是同步操作的执行函数,包括SYNC
与PSYNC
,下面我们只看其SYNC
部分:
// 主机SYNC 和PSYNC 命令处理函数,会尝试进行部分同步和全同步
/*src/replication.c/syncCommand*/
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
......
// 主机尝试部分同步,失败的话向从机发送+FULLRESYNC master_runid offset,
// 接着启动BGSAVE
// 执行全同步:
/* Full resynchronization. */
server.stat_sync_full++;
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != -1) {
/* 存在BGSAVE 后台进程。
1. 如果master 现有所连接的所有从机slaves 当中有存在
REDIS_REPL_WAIT_BGSAVE_END 的从机,那么将从机c 设置为
REDIS_REPL_WAIT_BGSAVE_END;
2. 否则,设置为REDIS_REPL_WAIT_BGSAVE_START*/
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient *slave;
listNode *ln;
listIter li;
// 检测是否已经有从机申请全同步
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
// 存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机slave,
// 就将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,
// 从而在BGSAVE 进程结束后,可以发送RDB 文件,
// 同时将从机slave 中的更新复制到此从机c。
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 将其他从机上的待回复的缓存复制到从机c
copyClientOutputBuffer(c,slave);
// 修改从机c 状态为「等待BGSAVE 进程结束」
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
// 不存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机,就将此从机c 状态设置为
// REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 进程的开启。
// 修改状态为「等待BGSAVE 进程开始」
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
// 不存在BGSAVE 后台进程,启动一个新的BGSAVE 进程
* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,从而在BGSAVE
// 进程结束后,可以发送RDB 文件,同时将从机slave 中的更新复制到此从机c。
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
// 因为新的slave进入,刷新复制脚本缓存
/* Flush the script cache for the new slave. */
replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
listAddNodeTail(server.slaves,c);
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
主机在执行完BGSAVE
之后,会将RDB文件发送给从服务器,这是通过调用backgroundSaveDoneHandler
完成的:
/*
* 处理 BGSAVE 完成时发送的信号
*/
/*src/rdb.c/backgroundSaveDoneHandler*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
// BGSAVE 成功
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
// BGSAVE 出错
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background saving error");
server.lastbgsave_status = REDIS_ERR;
// BGSAVE 被中断
} else {
redisLog(REDIS_WARNING,
"Background saving terminated by signal %d", bysignal);
// 移除临时文件
rdbRemoveTempFile(server.rdb_child_pid);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = REDIS_ERR;
}
// 更新服务器状态
server.rdb_child_pid = -1;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
// 处理正在等待 BGSAVE 完成的那些 slave
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
这其中关于主从复制的操作就是调用了updateSlaveWaitingBgsave
:
/* This function is called at the end of every background saving.
* 在每次 BGSAVE 执行完毕之后使用
*
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
* bgsaveerr 可能是 REDIS_OK 或者 REDIS_ERR ,显示 BGSAVE 的执行结果
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization.
*
* 这个函数是在 BGSAVE 完成之后的异步回调函数,
* 它指导该怎么执行和 slave 相关的 RDB 下一步工作。
*/
/*src/replication.c/updateSlavesWaitingBgsave*/
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
// 遍历所有 slave
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
// 之前的 RDB 文件不能被 slave 使用,
// 开始新的 BGSAVE
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
// 执行到这里,说明有 slave 在等待 BGSAVE 完成
struct redis_stat buf;
// 但是 BGSAVE 执行错误
if (bgsaveerr != REDIS_OK) {
// 释放 slave
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
// 打开 RDB 文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
// 设置偏移量,各种值
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
// 更新状态
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
// 清空之前的写事件处理器
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 将 sendBulkToSlave 安装为 slave 的写事件处理器
// 它用于将 RDB 文件发送给 slave
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
// 需要执行新的 BGSAVE
if (startbgsave) {
/* Since we are starting a new background save for one or more slaves,
* we flush the Replication Script Cache to use EVAL to propagate every
* new EVALSHA for the first time, since all the new slaves don't know
* about previous scripts. */
// 开始行的 BGSAVE ,并清空脚本缓存
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
IV、部分重同步
之前已经说明,部分重同步功能是通过复制偏移量,复制积压缓冲区等结构维护的。接下来我们看其是如何进行主从服务器交互的:
1、从机在connectWithMaster
连接主机时会将syncWithMaster
设置为回调函数:
// 连接主机connectWithMaster() 的时候,会被注册为回调函数
/*src/replication.c/syncWithMaster*/
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
......
// 尝试部分同步,主机允许进行部分同步会返回+CONTINUE,从机接收后注册相应的事件
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,在slaveTryPartialResynchronization()
// 中已经设置回调函数readQueryFromClient()
// PSYNC_FULLRESYNC:全同步,会下载RDB 文件
// PSYNC_NOT_SUPPORTED:未知
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
"Partial Resynchronization.");
return;
}
// 执行全同步
......
}
2、其中的slaveTryPartialResynchronization
函数即为判断是进行部分重同步还是全同步这在上面已经说明。
下面看syncCommand
中关于部分重同步的部分:
/*src/replication.c/syncCommand*/
void syncCommand(redisClient *c) {
......
// 主机尝试部分同步,允许则进行部分同步,会返回+CONTINUE,接着发送积压空间
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
**
+FULLRESYNC <runid> <offset>
**
So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 部分同步
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 部分同步失败,会进行全同步,这时会收到来自客户端的runid
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC_SLAVE;
}
// 执行全同步:
......
}
3、主机收到部分重同步请求之后也会判断是否允许:
// 主机尝试是否能进行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
**
On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
// 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便
// 下一次的部分同步。
// 1)master_runid 是从机提供一个因缓存主机的runid,
// 2)server.runid 是本机(主机)的runid。
// 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,
// 只能进行全同步
// "?" 表示从机要求全同步
// 什么时候从机会要求全同步???
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
goto need_full_resync;
}
// 从参数中解析整数,整数是从机指定的偏移量
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
// 部分同步失败的情况:
// 1、不存在积压空间
if (!server.repl_backlog ||
// 2、psync_offset 太过小,即从机错过太多更新记录,安全起见,实行全同步
// 我们知道,积压空间的大小是有限的,如果某个从机错过的更新过多,将无法
// 在积压空间中找到更新的记录
psync_offset 越界
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
// 经检测,不满足部分同步的条件,转而进行全同步
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog "
"(Slave request was: %lld).", psync_offset);
if (psync_offset > server. ) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is "
"greater than the master replication offset.");
}
goto need_full_resync;
}
// 执行部分同步:
// 1)标记客户端为从机
// 2)通知从机准备接收数据。从机收到+CONTINUE 会做好准备
// 3)开发发送数据
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
// 将连接的客户端标记为从机
c->flags |= REDIS_SLAVE;
// 表示进行部分同步
// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
// updates. */
c->replstate = REDIS_REPL_ONLINE;
// 更新ack 的时间
c->repl_ack_time = server.unixtime;
// 添加入从机链表
listAddNodeTail(server.slaves,c);
// 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 向从机写积压空间中的数据,积压空间存储有「更新缓存」
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of "
"backlog starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
......
// 向从机发送+FULLRESYNC runid repl_offset
}
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》