Redis源码研究之主从复制

2018-05-05  本文已影响123人  wenmingxing

本文主要说明Redis中主从服务器之间复制的实现。

建议阅读:
1、Redis主从服务器功能的理论部分见:Redis之主从服务器的复制

I、上帝视角

1、为完成主从复制操作,redisServer结构体中维护了几个相关的数据:

/*src/redis.h/redisServer*/
struct redisServer {
    ......
    /* Replication (master) */
    // 最近一次使用(访问)的数据集
    int slaveseldb; /* Last SELECTed DB in replication output */
    // 全局复制偏移量(一个累积值)
    long long master_repl_offset; /* Global replication offset */
    // 主从连接心跳频率(发送ping的频率)
    int repl_ping_slave_period; /* Master pings the slave every N seconds */
    // 指向复制积压缓冲区backlog
    char *repl_backlog; /* Replication backlog for partial syncs */
    // backlog大小
    long long repl_backlog_size; /* Backlog circular buffer size */
    // backlog中写入的新数据的大小
    long long repl_backlog_histlen; /* Backlog actual data length */
    // backlog的当前索引
    long long repl_backlog_idx; /* Backlog circular buffer current offset */
    // backlog中可以被还原的第一个字节的偏移量
    long long repl_backlog_off; /* Replication offset of first byte in the
    backlog buffer. */
    // 积压空间有效时间,即过期时间
    time_t repl_backlog_time_limit; /* Time without slaves after the backlog
    gets released. */
    
    // 距离上一次有从服务器的时间
    time_t repl_no_slaves_since;    /* We have no slaves since that time.
                                       Only valid if server.slaves len is 0. */

    // 是否开启最小数量从服务器写入功能
    int repl_min_slaves_to_write;   /* Min number of slaves to write. */
    // 定义最小数量从服务器的最大延迟值
    int repl_min_slaves_max_lag;    /* Max lag of <count> slaves to write. */
    // 延迟良好的从服务器的数量
    int repl_good_slaves_count;     /* Number of slaves with lag <= max_lag. */


    /* Replication (slave) */
    // 主服务器的验证密码
    char *masterauth;               /* AUTH with this password with master */
    // 主服务器的地址
    char *masterhost;               /* Hostname of master */
    // 主服务器的端口
    int masterport;                 /* Port of master */
    // 超时时间
    int repl_timeout;               /* Timeout after N seconds of master idle */
    // 主服务器所对应的客户端
    redisClient *master;     /* Client that is master for this slave */
    // 被缓存的主服务器,PSYNC 时使用
    redisClient *cached_master; /* Cached master to be reused for PSYNC. */
    int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
    // 复制的状态(服务器是从服务器时使用)
    int repl_state;          /* Replication status if the instance is a slave */
    // RDB 文件的大小
    off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
    // 已读 RDB 文件内容的字节数
    off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
    // 最近一次执行 fsync 时的偏移量
    // 用于 sync_file_range 函数
    off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
    // 主服务器的套接字
    int repl_transfer_s;     /* Slave -> Master SYNC socket */
    // 保存 RDB 文件的临时文件的描述符
    int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
    // 保存 RDB 文件的临时文件名字
    char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
    // 最近一次读入 RDB 内容的时间
    time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
    int repl_serve_stale_data; /* Serve stale data when link is down? */
    // 是否只读从服务器?
    int repl_slave_ro;          /* Slave is read only? */
    // 连接断开的时长
    time_t repl_down_since; /* Unix time at which link with master went down */
    // 是否要在 SYNC 之后关闭 NODELAY ?
    int repl_disable_tcp_nodelay;   /* Disable TCP_NODELAY after SYNC? */
    // 从服务器优先级
    int slave_priority;             /* Reported in INFO and used by Sentinel. */
    // 本服务器(从服务器)当前主服务器的 RUN ID
    char repl_master_runid[REDIS_RUN_ID_SIZE+1];  /* Master run id for PSYNC. */
    // 初始化偏移量
    long long repl_master_initial_offset;         /* Master PSYNC offset. */
}  

2、下面我们看关于上述代码中提到的复制积压缓冲区的内容,这是实现部分重同步的重要结构:

复制积压缓冲区可以看成是一个队列,当执行一个Redis命令时,积压缓冲区中的内容将会更新,在源码中调用顺序是这样的call()-->propagate()-->replicationFeedSlaves()

/*调用命令的实现函数,执行命令*/
/*src/redis.c/call*/
void call(redisClient *c, int flags) {
    ......
    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);
    // 脏数据标记,数据是否被修改
    dirty = server.dirty;
    // 执行命令对应的函数
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    ......
    // 将客户端请求的数据修改记录传播给AOF 和从机
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
        // 强制主从复制
    if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
        // 强制AOF 持久化
    if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
        // 数据被修改
    if (dirty)
        flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        // 传播数据修改记录
    if (flags != REDIS_PROPAGATE_NONE)
        propagate(c->cmd,c->db->id,c->argv,c->argc,flags);  //命令传播
    }
    ......
}  

3、propagate()函数有两个传播方向,一个是AOF持久化,一个是主从复制:

/*命令传播*/  
/*src/redis.c/propagate*/
/*FLAG 可以是以下标识的 xor :
 *
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 *   不传播
 *
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 *   传播到 AOF
 *
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 *   传播到 slave
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 传播到 AOF
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

    // 传播到 slave
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

4、关于AOF的内容我们已经在前面讲过,我们下面看replicationFeedSlaves是如何实现的:

// 将传入的参数发送给从服务器
/*src/replication.c/replicationFeedSlaves*/
// 操作分为三步:
// 1) 构建协议内容
// 2) 将协议内容备份到 backlog
// 3) 将内容发送给各个从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];

    /* If there aren't slaves, and there is no backlog buffer to populate,
     * we can return ASAP. */
    // backlog 为空,且没有从服务器,直接返回
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

    /* We can't have slaves attached and no backlog. */
    redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    /* Send SELECT command to every slave if needed. */
    // 如果有需要的话,发送 SELECT 命令,指定数据库
    if (server.slaveseldb != dictid) {
        robj *selectcmd;

        /* For a few DBs we have pre-computed SELECT command. */
        if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
            int dictid_len;

            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(REDIS_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }

        /* Add the SELECT command into the backlog. */
        // 将 SELECT 命令添加到 backlog
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

        /* Send it to slaves. */
        // 发送给所有从服务器
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            addReply(slave,selectcmd);
        }

        if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }

    server.slaveseldb = dictid;

    /* Write the command to the replication backlog if any. */
    // 将命令写入到backlog
    if (server.repl_backlog) {
        char aux[REDIS_LONGSTR_SIZE+3];

        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);

        for (j = 0; j < argc; j++) {
            long objlen = stringObjectLen(argv[j]);

            /* We need to feed the buffer with the object as a bulk reply
             * not just as a plain string, so create the $..CRLF payload len 
             * ad add the final CRLF */
            // 将参数从对象转换成协议格式
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
            feedReplicationBacklogWithObject(argv[j]);
            feedReplicationBacklog(aux+len+1,2);
        }
    }

    /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {

        // 指向从服务器
        redisClient *slave = ln->value;

        /* Don't feed slaves that are still waiting for BGSAVE to start */
        // 不要给正在等待 BGSAVE 开始的从服务器发送命令
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

        /* Feed slaves that are waiting for the initial SYNC (so these commands
         * are queued in the output buffer until the initial SYNC completes),
         * or are already in sync with the master. */
        // 向已经接收完和正在接收 RDB 文件的从服务器发送命令
        // 如果从服务器正在接收主服务器发送的 RDB 文件,
        // 那么在初次 SYNC 完成之前,主服务器发送的内容会被放进一个缓冲区里面

        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);

        /* Finally any additional argument that was not stored inside the
         * static buffer if any (from j to argc). */
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

II、主从关系的建立

1、主从服务器的建立是通过从服务器发送SLAVEOF命令完成的,从服务器会自动连接主机,注册响应的读写事件:

void slaveofCommand(redisClient *c) {
    /* SLAVEOF is not allowed in cluster mode as replication is automatically
     * configured using the current address of the master node. */
    // 不允许在集群模式中使用
    if (server.cluster_enabled) {
        addReplyError(c,"SLAVEOF not allowed in cluster mode.");
        return;
    }

    /* The special host/port combination "NO" "ONE" turns the instance
     * into a master. Otherwise the new master address is set. */
    // SLAVEOF NO ONE 让从服务器转为主服务器
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        if (server.masterhost) {
            // 让服务器取消复制,成为主服务器
            replicationUnsetMaster();
            redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
        }
    } else {
        long port;

        // 获取端口参数
        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
            return;

        /* Check if we are already attached to the specified slave */
        // 检查输入的 host 和 port 是否服务器目前的主服务器
        // 如果是的话,向客户端返回 +OK ,不做其他动作
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
            return;
        }

        /* There was no previous master or the user specified a different one,
         * we can continue. */
        // 没有前任主服务器,或者客户端指定了新的主服务器
        // 开始执行复制操作
        replicationSetMaster(c->argv[1]->ptr, port);
        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
            server.masterhost, server.masterport);
    }
    addReply(c,shared.ok);
}  

2、同步的选取
当从服务器发起时首先会判断是否可以执行部分重同步,若不行则被迫执行全同步。
其判断及调用过程如下:

/*从服务器用于同步主服务器的回调函数*/
/*src/replication.c/syncWithMaster*/
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);

    /* If this event fired after the user turned the instance into a master
     * with SLAVEOF NO ONE we must just return ASAP. */
    // 如果处于 SLAVEOF NO ONE 模式,那么关闭 fd
    if (server.repl_state == REDIS_REPL_NONE) {
        close(fd);
        return;
    }

    /* Check for errors in the socket. */
    // 检查套接字错误
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }

    /* If we were connecting, it's time to send a non blocking PING, we want to
     * make sure the master is able to reply before going into the actual
     * replication process where we have long timeouts in the order of
     * seconds (in the meantime the slave would block). */
    // 如果状态为 CONNECTING ,那么在进行初次同步之前,
    // 向主服务器发送一个非阻塞的 PONG 
    // 因为接下来的 RDB 文件发送非常耗时,所以我们想确认主服务器真的能访问
    if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        // 手动发送同步 PING ,暂时取消监听写事件
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        // 更新状态
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don't check for errors at all, we have the timeout
         * that will take care about this. */
        // 同步发送 PING
        syncWrite(fd,"PING\r\n",6,100);

        // 返回,等待 PONG 到达
        return;
    }

    /* Receive the PONG command. */
    // 接收 PONG 命令
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];

        /* Delete the readable event, we no longer need it now that there is
         * the PING reply to read. */
        // 手动同步接收 PONG ,暂时取消监听读事件
        aeDeleteFileEvent(server.el,fd,AE_READABLE);

        /* Read the reply with explicit timeout. */
        // 尝试在指定时间限制内读取 PONG
        buf[0] = '\0';
        // 同步接收 PONG
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) == -1)
        {
            redisLog(REDIS_WARNING,
                "I/O error reading PING reply from master: %s",
                strerror(errno));
            goto error;
        }

        /* We accept only two replies as valid, a positive +PONG reply
         * (we just check for "+") or an authentication error.
         * Note that older versions of Redis replied with "operation not
         * permitted" instead of using a proper error code, so we test
         * both. */
        // 接收到的数据只有两种可能:
        // 第一种是 +PONG ,第二种是因为未验证而出现的 -NOAUTH 错误
        if (buf[0] != '+' &&
            strncmp(buf,"-NOAUTH",7) != 0 &&
            strncmp(buf,"-ERR operation not permitted",28) != 0)
        {
            // 接收到未验证错误
            redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
            goto error;
        } else {
            // 接收到 PONG
            redisLog(REDIS_NOTICE,
                "Master replied to PING, replication can continue...");
        }
    }

    /* AUTH with the master if required. */
    // 进行身份验证
    if(server.masterauth) {
        err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
        if (err[0] == '-') {
            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
    }

    /* Set the slave port, so that Master's INFO command can list the
     * slave listening port correctly. */
    // 将从服务器的端口发送给主服务器,
    // 使得主服务器的 INFO 命令可以显示从服务器正在监听的端口
    {
        sds port = sdsfromlonglong(server.port);
        err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                         NULL);
        sdsfree(port);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        if (err[0] == '-') {
            redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
        }
        sdsfree(err);
    }

    /* Try a partial resynchonization. If we don't have a cached master
     * slaveTryPartialResynchronization() will at least try to use PSYNC
     * to start a full resynchronization so that we get the master run id
     * and the global offset, to try a partial resync at the next
     * reconnection attempt. */
    // 根据返回的结果决定是执行部分 resync ,还是 full-resync
    psync_result = slaveTryPartialResynchronization(fd);

    // 可以执行部分 resync
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
        // 返回
        return;
    }

    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */
    // 主服务器不支持 PSYNC ,发送 SYNC
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
        // 向主服务器发送 SYNC 命令
        if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }

    // 如果执行到这里,
    // 那么 psync_result == PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED

    /* Prepare a suitable temp file for bulk transfer */
    // 打开一个临时文件,用于写入和保存接下来从主服务器传来的 RDB 文件数据
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        goto error;
    }

    /* Setup the non blocking download of the bulk file. */
    // 设置一个读事件处理器,来读取主服务器的 RDB 文件
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }

    // 设置状态
    server.repl_state = REDIS_REPL_TRANSFER;

    // 更新统计信息
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_tmpfile = zstrdup(tmpfile);

    return;

error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}  

3、syncWithMaster主要通过调用slaveTryPartialResynchronization来判断是否能进行部分重同步,否则进行全同步:

/*src/replication.c/slaveTryPartialResynchronization*/
/* Try a partial resynchronization with the master if we are about to reconnect.
 *
 * 在重连接之后,尝试进行部分重同步。
 *
 * If there is no cached master structure, at least try to issue a
 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
 * command in order to obtain the master run id and the master replication
 * global offset.
 *
 * 如果 master 缓存为空,那么通过 "PSYNC ? -1" 命令来触发一次 full resync ,
 * 让主服务器的 run id 和复制偏移量可以传到附属节点里面。
 *
 * This function is designed to be called from syncWithMaster(), so the
 * following assumptions are made:
 *
 * 这个函数由 syncWithMaster() 函数调用,它做了以下假设:
 *
 * 1) We pass the function an already connected socket "fd".
 *    一个已连接套接字 fd 会被传入函数
 * 2) This function does not close the file descriptor "fd". However in case
 *    of successful partial resynchronization, the function will reuse
 *    'fd' as file descriptor of the server.master client structure.
 *    函数不会关闭 fd 。
 *    当部分同步成功时,函数会将 fd 用作 server.master 客户端结构中的
 *    文件描述符。
 *
 * The function returns:
 * 以下是函数的返回值:
 *
 * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
 *                 PSYNC 命令成功,可以继续。
 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
 *                   In this case the master run_id and global replication
 *                   offset is saved.
 *                   主服务器支持 PSYNC 功能,但目前情况需要执行 full resync 。
 *                   在这种情况下, run_id 和全局复制偏移量会被保存。
 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
 *                      the caller should fall back to SYNC.
 *                      主服务器不支持 PSYNC ,调用者应该下降到 SYNC 命令。
 */

#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;

    /* Initially set repl_master_initial_offset to -1 to mark the current
     * master run_id and offset as not valid. Later if we'll be able to do
     * a FULL resync using the PSYNC command we'll set the offset at the
     * right value, so that this information will be propagated to the
     * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;

    if (server.cached_master) {
        // 缓存存在,尝试部分重同步
        // 命令为 "PSYNC <master_run_id> <repl_offset>"
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
        // 缓存不存在
        // 发送 "PSYNC ? -1" ,要求完整重同步
        redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }

    /* Issue the PSYNC command */
    // 向主服务器发送 PSYNC 命令
    reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);

    // 接收到 FULLRESYNC ,进行 full-resync
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;

        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        // 分析并记录主服务器的 run id
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        // 检查 run id 的合法性
        if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                "Master replied with wrong +FULLRESYNC syntax.");
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * runid to make sure next PSYNCs will fail. */
            // 主服务器支持 PSYNC ,但是却发来了异常的 run id
            // 只好将 run id 设为 0 ,让下次 PSYNC 时失败
            memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            // 保存 run id
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            // 以及 initial offset
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            // 打印日志,这是一个 FULL resync
            redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discard the cached master structure. */
        // 要开始完整重同步,缓存中的 master 已经没用了,清除它
        replicationDiscardCachedMaster();
        sdsfree(reply);
        
        // 返回状态
        return PSYNC_FULLRESYNC;
    }

    // 接收到 CONTINUE ,进行 partial resync
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
            "Successful partial resynchronization with master.");
        sdsfree(reply);
        // 将缓存中的 master 设为当前 master
        replicationResurrectCachedMaster(fd);

        // 返回状态
        return PSYNC_CONTINUE;
    }

    /* If we reach this point we receied either an error since the master does
     * not understand PSYNC, or an unexpected reply from the master.
     * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */

    // 接收到错误?
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
            "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
            "Master does not support PSYNC or is in "
            "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();

    // 主服务器不支持 PSYNC
    return PSYNC_NOT_SUPPORTED;
}  

III、全同步

全同步发生在两种情况下:
1、从服务器第一次连接到主服务器;
2、从服务器断线之后,复制积压缓冲区中的可恢复内容已经落后与offset;

syncCommand()是同步操作的执行函数,包括SYNCPSYNC,下面我们只看其SYNC部分:

// 主机SYNC 和PSYNC 命令处理函数,会尝试进行部分同步和全同步
/*src/replication.c/syncCommand*/
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主机尝试部分同步,失败的话向从机发送+FULLRESYNC master_runid offset,
    // 接着启动BGSAVE
    // 执行全同步:
    /* Full resynchronization. */
    server.stat_sync_full++;
    /* Here we need to check if there is a background saving operation
    * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
        /* 存在BGSAVE 后台进程。
        1. 如果master 现有所连接的所有从机slaves 当中有存在
        REDIS_REPL_WAIT_BGSAVE_END 的从机,那么将从机c 设置为
        REDIS_REPL_WAIT_BGSAVE_END;
        2. 否则,设置为REDIS_REPL_WAIT_BGSAVE_START*/
        /* Ok a background save is in progress. Let's check if it is a good
        * one for replication, i.e. if there is another slave that is
        * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
        // 检测是否已经有从机申请全同步
        listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        slave = ln->value;
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
    if (ln) {
        // 存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机slave,
        // 就将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,
        // 从而在BGSAVE 进程结束后,可以发送RDB 文件,
        // 同时将从机slave 中的更新复制到此从机c。
        /* Perfect, the server is already registering differences for
        * another slave. Set the right state, and copy the buffer. */
        // 将其他从机上的待回复的缓存复制到从机c
        copyClientOutputBuffer(c,slave);
        // 修改从机c 状态为「等待BGSAVE 进程结束」
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
    } else {
        // 不存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机,就将此从机c 状态设置为
        // REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 进程的开启。
        // 修改状态为「等待BGSAVE 进程开始」
        /* No way, we need to wait for the next BGSAVE in order to
        * register differences */
        c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
        redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
    }
    } else {
        // 不存在BGSAVE 后台进程,启动一个新的BGSAVE 进程
        * Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
    if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
        addReplyError(c,"Unable to perform background save");
        return;
    }
    // 将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,从而在BGSAVE
    // 进程结束后,可以发送RDB 文件,同时将从机slave 中的更新复制到此从机c。
    c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    // 因为新的slave进入,刷新复制脚本缓存
    /* Flush the script cache for the new slave. */
    replicationScriptCacheFlush();
    }
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
        c->repldbfd = -1;
        c->flags |= REDIS_SLAVE;
        server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
        listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}  

主机在执行完BGSAVE之后,会将RDB文件发送给从服务器,这是通过调用backgroundSaveDoneHandler完成的:

/*
 * 处理 BGSAVE 完成时发送的信号
 */
/*src/rdb.c/backgroundSaveDoneHandler*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {

    // BGSAVE 成功
    if (!bysignal && exitcode == 0) {
        redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
        server.dirty = server.dirty - server.dirty_before_bgsave;
        server.lastsave = time(NULL);
        server.lastbgsave_status = REDIS_OK;

    // BGSAVE 出错
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING, "Background saving error");
        server.lastbgsave_status = REDIS_ERR;

    // BGSAVE 被中断
    } else {
        redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", bysignal);
        // 移除临时文件
        rdbRemoveTempFile(server.rdb_child_pid);
        /* SIGUSR1 is whitelisted, so we have a way to kill a child without
         * tirggering an error conditon. */
        if (bysignal != SIGUSR1)
            server.lastbgsave_status = REDIS_ERR;
    }

    // 更新服务器状态
    server.rdb_child_pid = -1;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;

    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    // 处理正在等待 BGSAVE 完成的那些 slave
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}

这其中关于主从复制的操作就是调用了updateSlaveWaitingBgsave:

/* This function is called at the end of every background saving.
 * 在每次 BGSAVE 执行完毕之后使用
 *
 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
 * otherwise REDIS_ERR is passed to the function.
 * bgsaveerr 可能是 REDIS_OK 或者 REDIS_ERR ,显示 BGSAVE 的执行结果
 *
 * The goal of this function is to handle slaves waiting for a successful
 * background saving in order to perform non-blocking synchronization. 
 * 
 * 这个函数是在 BGSAVE 完成之后的异步回调函数,
 * 它指导该怎么执行和 slave 相关的 RDB 下一步工作。
 */
/*src/replication.c/updateSlavesWaitingBgsave*/
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    // 遍历所有 slave
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            // 之前的 RDB 文件不能被 slave 使用,
            // 开始新的 BGSAVE
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {

            // 执行到这里,说明有 slave 在等待 BGSAVE 完成

            struct redis_stat buf;

            // 但是 BGSAVE 执行错误
            if (bgsaveerr != REDIS_OK) {
                // 释放 slave
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }

            // 打开 RDB 文件
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }

            // 设置偏移量,各种值
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            // 更新状态
            slave->replstate = REDIS_REPL_SEND_BULK;

            slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                (unsigned long long) slave->repldbsize);

            // 清空之前的写事件处理器
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            // 将 sendBulkToSlave 安装为 slave 的写事件处理器
            // 它用于将 RDB 文件发送给 slave
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

    // 需要执行新的 BGSAVE
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
         * we flush the Replication Script Cache to use EVAL to propagate every
         * new EVALSHA for the first time, since all the new slaves don't know
         * about previous scripts. */
        // 开始行的 BGSAVE ,并清空脚本缓存
        replicationScriptCacheFlush();
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

IV、部分重同步

之前已经说明,部分重同步功能是通过复制偏移量复制积压缓冲区等结构维护的。接下来我们看其是如何进行主从服务器交互的:

1、从机在connectWithMaster连接主机时会将syncWithMaster设置为回调函数:

// 连接主机connectWithMaster() 的时候,会被注册为回调函数
/*src/replication.c/syncWithMaster*/
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    ......
    // 尝试部分同步,主机允许进行部分同步会返回+CONTINUE,从机接收后注册相应的事件
    /* Try a partial resynchonization. If we don't have a cached master
    * slaveTryPartialResynchronization() will at least try to use PSYNC
    * to start a full resynchronization so that we get the master run id
    * and the global offset, to try a partial resync at the next
    * reconnection attempt. */
    // 函数返回三种状态:
    // PSYNC_CONTINUE:表示会进行部分同步,在slaveTryPartialResynchronization()
    // 中已经设置回调函数readQueryFromClient()
    // PSYNC_FULLRESYNC:全同步,会下载RDB 文件
    // PSYNC_NOT_SUPPORTED:未知
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
        "Partial Resynchronization.");
        return;
    }
    // 执行全同步
    ......
}  

2、其中的slaveTryPartialResynchronization函数即为判断是进行部分重同步还是全同步这在上面已经说明。

下面看syncCommand中关于部分重同步的部分:

/*src/replication.c/syncCommand*/
void syncCommand(redisClient *c) {
    ......
    // 主机尝试部分同步,允许则进行部分同步,会返回+CONTINUE,接着发送积压空间
    /* Try a partial resynchronization if this is a PSYNC command.
    * If it fails, we continue with usual full resynchronization, however
    * when this happens masterTryPartialResynchronization() already
    * replied with:
    **
    +FULLRESYNC <runid> <offset>
    **
    So the slave knows the new runid and offset to try a PSYNC later
    * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 部分同步
    if (masterTryPartialResynchronization(c) == REDIS_OK) {
        server.stat_sync_partial_ok++;
        return; /* No full resync needed, return. */
    } else {
        // 部分同步失败,会进行全同步,这时会收到来自客户端的runid
        char *master_runid = c->argv[1]->ptr;
        /* Increment stats for failed PSYNCs, but only if the
        * runid is not "?", as this is used by slaves to force a full
        * resync on purpose when they are not albe to partially
        * resync. */
    if (master_runid[0] != '?') server.stat_sync_partial_err++;
    }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
        * of the replication protocol (like redis-cli --slave). Flag the client
        * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC_SLAVE;
    }
    // 执行全同步:
    ......
}  

3、主机收到部分重同步请求之后也会判断是否允许:

// 主机尝试是否能进行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
**
On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
    /* Is the runid of this master the same advertised by the wannabe slave
    * via PSYNC? If runid changed this master is a different instance and
    * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
        // 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便
        // 下一次的部分同步。
        // 1)master_runid 是从机提供一个因缓存主机的runid,
        // 2)server.runid 是本机(主机)的runid。
        // 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,
        // 只能进行全同步
        // "?" 表示从机要求全同步
        // 什么时候从机会要求全同步???
    /* Run id "?" is used by slaves that want to force a full resync. */
    if (master_runid[0] != '?') {
        redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
        "Runid mismatch (Client asked for '%s', I'm '%s')",
        master_runid, server.runid);
    } else {
        redisLog(REDIS_NOTICE,"Full resync requested by slave.");
    }
        goto need_full_resync;
    }
    // 从参数中解析整数,整数是从机指定的偏移量
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
        REDIS_OK) goto need_full_resync;
        // 部分同步失败的情况:
        // 1、不存在积压空间
    if (!server.repl_backlog ||
        // 2、psync_offset 太过小,即从机错过太多更新记录,安全起见,实行全同步
        // 我们知道,积压空间的大小是有限的,如果某个从机错过的更新过多,将无法
        // 在积压空间中找到更新的记录
        psync_offset 越界
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
        // 经检测,不满足部分同步的条件,转而进行全同步
    {
    redisLog(REDIS_NOTICE,
    "Unable to partial resync with the slave for lack of backlog "
    "(Slave request was: %lld).", psync_offset);
    if (psync_offset > server. ) {
        redisLog(REDIS_WARNING,
        "Warning: slave tried to PSYNC with an offset that is "
        "greater than the master replication offset.");
    }
    goto need_full_resync;
    }
    // 执行部分同步:
    // 1)标记客户端为从机
    // 2)通知从机准备接收数据。从机收到+CONTINUE 会做好准备
    // 3)开发发送数据
    /* If we reached this point, we are able to perform a partial resync:
    * 1) Set client state to make it a slave.
    * 2) Inform the client we can continue with +CONTINUE
    * 3) Send the backlog data (from the offset to the end) to the slave. */
    // 将连接的客户端标记为从机
    c->flags |= REDIS_SLAVE;
    // 表示进行部分同步
    // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
    // updates. */
    c->replstate = REDIS_REPL_ONLINE;
    // 更新ack 的时间
    c->repl_ack_time = server.unixtime;
    // 添加入从机链表
    listAddNodeTail(server.slaves,c);
    // 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)
    /* We can't use the connection buffers since they are used to accumulate
    * new commands at this stage. But we are sure the socket send buffer is
    * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    // 向从机写积压空间中的数据,积压空间存储有「更新缓存」
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
    "Partial resynchronization request accepted. Sending %lld bytes of "
    "backlog starting from offset %lld.", psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
    * to -1 to force the master to emit SELECT, since the slave already
    * has this state from the previous connection with the master. */
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */
    need_full_resync:
    ......
    // 向从机发送+FULLRESYNC runid repl_offset
}  

【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》

上一篇 下一篇

猜你喜欢

热点阅读