redis学习

redis主从复制----源码实现

2019-08-20  本文已影响0人  奔跑的Robi

redis的复制实现包括了以下几个过程

还有部分重实现

主从关系建立

执行slaveof

不管使用redis的哪一种复制方式,都是在slave端进行操作,在执行slaveof命令时调用了以下函数

void replicaofCommand(client *c) {
    // 如果当前处于集群模式,则不能进行复制操作
    if (server.cluster_enabled) {
        addReplyError(c,"REPLICAOF not allowed in cluster mode.");
        return;
    }

   // SLAVEOF NO ONE命令使得这个从节点关闭复制功能,并从从节点转变回主节点,原来同步所得的数据集不会被丢弃。
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        // 如果保存了主节点IP
        if (server.masterhost) {
            // 取消复制操作,设置服务器为主服务器
            replicationUnsetMaster();
            // 获取client的每种信息,并以sds形式返回,并打印到日志中
            sds client = catClientInfoString(sdsempty(),c);
            serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
                client);
            sdsfree(client);
        }
    } else {
        long port;

        if (c->flags & CLIENT_SLAVE)
        {
            addReplyError(c, "Command is not valid when client is a replica.");
            return;
        }

        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
            return;

         // 如果已存在从属于masterhost主节点且命令参数指定的主节点和masterhost相等,端口也相等,直接返回
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
            return;
        }
        // 第一次执行设置端口和ip,或者是重新设置端口和IP
        // 设置服务器复制操作的主节点IP和端口
        replicationSetMaster(c->argv[1]->ptr, port);
        // 获取client的每种信息,并以sds形式返回,并打印到日志中
        sds client = catClientInfoString(sdsempty(),c);
        serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
            server.masterhost, server.masterport, client);
        sdsfree(client);
    }

    // 回复OK
    addReply(c,shared.ok);
}

当客户端执行slaveof命令后,该命令会被构造成redis协议格式,发送到slave节点服务器上,节点服务器会调用replicaofCommand函数执行这个命令,函数主要执行了几个操作

void replicationSetMaster(char *ip, int port) {
    int was_master = server.masterhost == NULL;

    sdsfree(server.masterhost);
    server.masterhost = sdsnew(ip);
    server.masterport = port;
    if (server.master) {
        freeClient(server.master);
    }
    disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */

    /* Force our slaves to resync with us as well. They may hopefully be able
     * to partially resync with us, but we can notify the replid change. */
    disconnectSlaves();
    cancelReplicationHandshake();
    /* Before destroying our master state, create a cached master using
     * our own parameters, to later PSYNC with the new master. */
    if (was_master) replicationCacheMasterUsingMyself();
    server.repl_state = REPL_STATE_CONNECT;
}

可以看到在replicationSetMaster函数中主要是重置了masterhost,masterport,并清除了之前的连接
因为当前slave节点之前可能是属于另一个master节点的,所以要清理关于之前master节点的所有缓存,关闭旧的连接等,最后设置了服务器的状态

server.repl_state = REPL_STATE_CONNECT   // 复制必须连接主节点

主从网络连接建立

slaveof是一个异步的命令,运行后设置好主节点的信息后就会立即返回。之后的复制流程是异步执行的,而异步触发的时机,是来自redis的定时任务。
在redis服务器初始化时,会启动serverCron()函数,作为时间事件处理函数,在该函数中调用了replicationCron()函数并设置定时为1秒

/* Replication cron function -- used to reconnect to master,
     * detect transfer failures, start background RDB transfers and so forth. */
    run_with_period(1000) replicationCron();

replicationCron()函数中存在判断,根据不同的复制状态进行相应的操作

/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
    static long long replication_cron_loops = 0;

    /* Non blocking connection timeout? */
    if (server.masterhost &&
        (server.repl_state == REPL_STATE_CONNECTING ||
         slaveIsInHandshakeState()) &&
         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
        cancelReplicationHandshake();
    }

    /* Bulk transfer I/O timeout? */
    if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
        cancelReplicationHandshake();
    }

    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    if (server.repl_state == REPL_STATE_CONNECT) {
        serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == C_OK) {
            serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
        }
    }

    /* Send ACK to master from time to time.
     * Note that we do not send periodic acks to masters that don't
     * support PSYNC and replication offsets. */
    if (server.masterhost && server.master &&
        !(server.master->flags & CLIENT_PRE_PSYNC))
        replicationSendAck();

    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    listIter li;
    listNode *ln;
    robj *ping_argv[1];

    /* First, send PING according to ping_slave_period. */
    if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
        listLength(server.slaves))
    {
        /* Note that we don't send the PING if the clients are paused during
         * a Redis Cluster manual failover: the PING we send will otherwise
         * alter the replication offsets of master and slave, and will no longer
         * match the one stored into 'mf_master_offset' state. */
        int manual_failover_in_progress =
            server.cluster_enabled &&
            server.cluster->mf_end &&
            clientsArePaused();

        if (!manual_failover_in_progress) {
            ping_argv[0] = createStringObject("PING",4);
            replicationFeedSlaves(server.slaves, server.slaveseldb,
                ping_argv, 1);
            decrRefCount(ping_argv[0]);
        }
    }

    /* Second, send a newline to all the slaves in pre-synchronization
     * stage, that is, slaves waiting for the master to create the RDB file.
     *
     * Also send the a newline to all the chained slaves we have, if we lost
     * connection from our master, to keep the slaves aware that their
     * master is online. This is needed since sub-slaves only receive proxied
     * data from top-level masters, so there is no explicit pinging in order
     * to avoid altering the replication offsets. This special out of band
     * pings (newlines) can be sent, they will have no effect in the offset.
     *
     * The newline will be ignored by the slave but will refresh the
     * last interaction timer preventing a timeout. In this case we ignore the
     * ping period and refresh the connection once per second since certain
     * timeouts are set at a few seconds (example: PSYNC response). */
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        int is_presync =
            (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
            (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
             server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));

        if (is_presync) {
            if (write(slave->fd, "\n", 1) == -1) {
                /* Don't worry about socket errors, it's just a ping. */
            }
        }
    }

    /* Disconnect timedout slaves. */
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;

            if (slave->replstate != SLAVE_STATE_ONLINE) continue;
            if (slave->flags & CLIENT_PRE_PSYNC) continue;
            if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
            {
                serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
                    replicationGetSlaveName(slave));
                freeClient(slave);
            }
        }
    }

    /* If this is a master without attached slaves and there is a replication
     * backlog active, in order to reclaim memory we can free it after some
     * (configured) time. Note that this cannot be done for slaves: slaves
     * without sub-slaves attached should still accumulate data into the
     * backlog, in order to reply to PSYNC queries if they are turned into
     * masters after a failover. */
    if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
        server.repl_backlog && server.masterhost == NULL)
    {
        time_t idle = server.unixtime - server.repl_no_slaves_since;

        if (idle > server.repl_backlog_time_limit) {
            /* When we free the backlog, we always use a new
             * replication ID and clear the ID2. This is needed
             * because when there is no backlog, the master_repl_offset
             * is not updated, but we would still retain our replication
             * ID, leading to the following problem:
             *
             * 1. We are a master instance.
             * 2. Our slave is promoted to master. It's repl-id-2 will
             *    be the same as our repl-id.
             * 3. We, yet as master, receive some updates, that will not
             *    increment the master_repl_offset.
             * 4. Later we are turned into a slave, connect to the new
             *    master that will accept our PSYNC request by second
             *    replication ID, but there will be data inconsistency
             *    because we received writes. */
            changeReplicationId();
            clearReplicationId2();
            freeReplicationBacklog();
            serverLog(LL_NOTICE,
                "Replication backlog freed after %d seconds "
                "without connected replicas.",
                (int) server.repl_backlog_time_limit);
        }
    }

    /* If AOF is disabled and we no longer have attached slaves, we can
     * free our Replication Script Cache as there is no need to propagate
     * EVALSHA at all. */
    if (listLength(server.slaves) == 0 &&
        server.aof_state == AOF_OFF &&
        listLength(server.repl_scriptcache_fifo) != 0)
    {
        replicationScriptCacheFlush();
    }

    /* Start a BGSAVE good for replication if we have slaves in
     * WAIT_BGSAVE_START state.
     *
     * In case of diskless replication, we make sure to wait the specified
     * number of seconds (according to configuration) so that other slaves
     * have the time to arrive before we start streaming. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        time_t idle, max_idle = 0;
        int slaves_waiting = 0;
        int mincapa = -1;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                idle = server.unixtime - slave->lastinteraction;
                if (idle > max_idle) max_idle = idle;
                slaves_waiting++;
                mincapa = (mincapa == -1) ? slave->slave_capa :
                                            (mincapa & slave->slave_capa);
            }
        }

        if (slaves_waiting &&
            (!server.repl_diskless_sync ||
             max_idle > server.repl_diskless_sync_delay))
        {
            /* Start the BGSAVE. The called function may start a
             * BGSAVE with socket target or disk target depending on the
             * configuration and slaves capabilities. */
            startBgsaveForReplication(mincapa);
        }
    }

    /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
    refreshGoodSlavesCount();
    replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}

在建立主从连接时,已经把状态设置为了REPL_STATE_CONNECT,所以在这个函数中会调用connectWithMaster()函数,去连接master服务器

int connectWithMaster(void) {
    int fd;
    // 非阻塞连接主节点
    fd = anetTcpNonBlockBestEffortBindConnect(NULL,
        server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
    if (fd == -1) {
        serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return C_ERR;
    }
    // 监听主节点fd的可读和可写事件的发生,并设置其处理程序为syncWithMaster
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        serverLog(LL_WARNING,"Can't create readable event for SYNC");
        return C_ERR;
    }
    // 最近一次读到RDB文件内容的时间
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REPL_STATE_CONNECTING;
    return C_OK;
}

connectWithMaster()函数中,使用非阻塞的方式连接master节点,连接成功后在套接字fd上绑定AE_READABLEAE_WRITABLE两个事件,监听发来的读写请求,并设置处理函数syncWithMaste()。最后将复制状态置为连接中

server.repl_state = REPL_STATE_CONNECTING

到这里,从节点和主节点的网络连接就建立完成了

发送PING命令

在建立主从网络连接时,在fd上绑定了读写事件,因为是从节点向主节点发起连接请求,所以会触发fd上的一个写事件,从而调用syncWithMaster()函数进行处理,这个函数的处理过程太长,这里就不全贴出了,只贴出与本次事件相关的处理逻辑

/* Send a PING to check the master is able to reply without errors. */
    if (server.repl_state == REPL_STATE_CONNECTING) {
        serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REPL_STATE_RECEIVE_PONG;
        /* Send the PING, don't check for errors at all, we have the timeout
         * that will take care about this. */
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
        if (err) goto write_error;
        return;
    }

这个函数中的操作很显而易见了

对于PING命令的处理函数位于server.c文件中

/* The PING command. It works in a different way if the client is in
 * in Pub/Sub mode. */
void pingCommand(client *c) {
    /* The command takes zero or one arguments. */
    if (c->argc > 2) {
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return;
    }

    if (c->flags & CLIENT_PUBSUB) {
        addReply(c,shared.mbulkhdr[2]);
        addReplyBulkCBuffer(c,"pong",4);
        if (c->argc == 1)
            addReplyBulkCBuffer(c,"",0);
        else
            addReplyBulk(c,c->argv[1]);
    } else {
        if (c->argc == 1)
            addReply(c,shared.pong);
        else
            addReplyBulk(c,c->argv[1]);
    }
}

上面函数中验证只要不是处于发布订阅模式,参数个数为1,就可以返回一个pong回复
在从节点套接字就会发生一个读事件,在syncWithMaster()中进行处理

/* Receive the PONG command. */
    // 如果复制的状态为REPL_STATE_RECEIVE_PONG,等待接受PONG命令
    if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
        // 从主节点读一个PONG命令sendSynchronousCommand
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

        /* We accept only two replies as valid, a positive +PONG reply
         * (we just check for "+") or an authentication error.
         * Note that older versions of Redis replied with "operation not
         * permitted" instead of using a proper error code, so we test
         * both. */
        // 只接受两种有效的回复。一种是 "+PONG",一种是认证错误"-NOAUTH"。
        // 旧版本的返回有"-ERR operation not permitted"
        if (err[0] != '+' &&
            strncmp(err,"-NOAUTH",7) != 0 &&
            strncmp(err,"-ERR operation not permitted",28) != 0)
        {
            serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
            sdsfree(err);
            goto error;
        } else {
            serverLog(LL_NOTICE,
                "Master replied to PING, replication can continue...");
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_AUTH;
    }

从套接字上读到返回的PONG信息,并将复制状态设置为REPL_STATE_SEND_AUTH,等待进行权限验证

权限认证

还是在syncWithMaster()函数中,在上面的if逻辑中设置完状态后,自然走到下一个if语句块进行权限验证操作

/* AUTH with the master if required. */
    if (server.repl_state == REPL_STATE_SEND_AUTH) {
        // 如果服务器设置了认证密码
        if (server.masterauth) {
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
            if (err) goto write_error;
            // 设置状态为等待接受认证回复
            server.repl_state = REPL_STATE_RECEIVE_AUTH;
            return;
        } else {
            // 如果没有设置认证密码,直接设置复制状态为发送端口号给主节点
            server.repl_state = REPL_STATE_SEND_PORT;
        }
    }

首先会检查是否有设置认证密码server.masterauth如果有,则将认证密码一起发送给主节点,如果没有则不需要认证了,直接将状态改为发送端口号给主节点REPL_STATE_SEND_PORT进行下一项操作

主节点上收到AUTH命令,调用authCommand()函数处理

/* Receive AUTH reply. */
// 接受AUTH认证的回复
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
    // 从主节点读回复
    err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    // 回复错误,认证失败
    if (err[0] == '-') {
        serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
        sdsfree(err);
        goto error;
    }
    sdsfree(err);
    // 设置复制状态为发送端口号给主节点
    server.repl_state = REPL_STATE_SEND_PORT;
}

主节点会比较发送来的server.masterauth和主节点上保存的server.requirepass是否一致,如果一致则返回OK

从节点接收到响应后,调用syncWithMaster()处理

/* Receive AUTH reply. */
// 接受AUTH认证的回复
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
    // 从主节点读回复
    err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    // 回复错误,认证失败
    if (err[0] == '-') {
        serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
        sdsfree(err);
        goto error;
    }
    sdsfree(err);
    // 设置复制状态为发送端口号给主节点
    server.repl_state = REPL_STATE_SEND_PORT;
}

在接收到响应后如果认证成功了,就将状态改为REPL_STATE_SEND_PORT,跟没有设置验证时一样

发送端口号

继续执行下一个if语句块

/* Set the slave port, so that Master's INFO command can list the
 * slave listening port correctly. */
// 如果复制状态是,发送从节点端口号给主节点,主节点的INFO命令就能够列出从节点正在监听的端口号
if (server.repl_state == REPL_STATE_SEND_PORT) {
    // 获取端口号
    sds port = sdsfromlonglong(server.slave_announce_port ?
        server.slave_announce_port : server.port);
    // 将端口号写给主节点
    err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","listening-port",port, NULL);
    sdsfree(port);
    if (err) goto write_error;
    sdsfree(err);
    // 设置复制状态为接受端口号
    server.repl_state = REPL_STATE_RECEIVE_PORT;
    return;
}

在主节点上调用replconfCommand()函数进行处理,代码太长就不贴了,总之主节点会将slave的端口号保存到client数据结构中c->slave_listening_port = port,并且回复OK

从节点接收到OK以后,调用syncWithMaster(),验证是否主节点接收成功,接收成功则继续改变状态

/* Receive REPLCONF listening-port reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        if (err[0] == '-') {
            serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                "REPLCONF listening-port: %s", err);
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_IP;
    }

确认主节点接收端口号成功后,将状态改为发送IP

发送IP

这里的操作跟之前类似,不加赘述,只贴出相关代码

    /* Set the slave ip, so that Master's INFO command can list the
     * slave IP address port correctly in case of port forwarding or NAT. */
    if (server.repl_state == REPL_STATE_SEND_IP) {
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "ip-address",server.slave_announce_ip, NULL);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_IP;
        return;
    }

    /* Receive REPLCONF ip-address reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_IP) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        if (err[0] == '-') {
            serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                "REPLCONF ip-address: %s", err);
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_CAPA;
    }

状态REPL_STATE_SEND_CAPA表示发送从节点的能力,即能否解析RDB文件的EOF流格式

发送CAPA

    /* Inform the master of our (slave) capabilities.
     *
     * EOF: supports EOF-style RDB transfer for diskless replication.
     * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
     *
     * The master will ignore capabilities it does not understand. */
    if (server.repl_state == REPL_STATE_SEND_CAPA) {
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "capa","eof","capa","psync2",NULL);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_CAPA;
        return;
    }

    /* Receive CAPA reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF capa. */
        if (err[0] == '-') {
            serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                  "REPLCONF capa: %s", err);
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_PSYNC;
    }

这里只要注意主节点是将收到的REPLCONF capa eof命令按位与到主节点上client的c->slave_capa

发送PSYNC

从节点发送PSYNC命令给主节点,即向主节点要求同步数据集。同步方式分为

// 复制状态为发送PSYNC命令。尝试进行部分重同步。
// 如果没有缓冲主节点的结构,slaveTryPartialResynchronization()函数将会至少尝试使用PSYNC去进行一个全同步,这样就能得到主节点的运行runid和全局复制偏移量。并且在下次重连接时可以尝试进行部分重同步。
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
    // 向主节点发送一个部分重同步命令PSYNC,参数0表示不读主节点的回复,只获取主节点的运行runid和全局复制偏移量
    if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
        // 发送PSYNC出错
        err = sdsnew("Write error sending the PSYNC command.");
        goto write_error;
    }
    // 设置复制状态为等待接受一个PSYNC回复
    server.repl_state = REPL_STATE_RECEIVE_PSYNC;
    return;
}

函数中通过调用slaveTryPartialResynchronization(fd,0)函数向master发送同步请求,函数太长,这里仅展示相关逻辑,调用参数上可以看到第二参数是0,

/* Writing half */
    if (!read_reply) {
        // 将repl_master_initial_offset设置为-1表示主节点的run_id和全局复制偏移量是无效的。
        // 如果能使用PSYNC命令执行一个全量同步,会正确设置全复制偏移量,以便这个信息被正确传播主节点的所有从节点中
        server.master_initial_offset = -1;

        // 主节点的缓存不为空,可以尝试进行部分重同步。PSYNC <master_run_id> <repl_offset>
        if (server.cached_master) {
            // 保存缓存runid
            psync_replid = server.cached_master->replid;
            // 获取已经复制的偏移量
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
        // 主节点的缓存为空,发送PSYNC ? -1。请求全量同步
        } else {
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_replid = "?";
            memcpy(psync_offset,"-1",3);
        }

        /* Issue the PSYNC command */
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
        if (reply != NULL) {
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
            sdsfree(reply);
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
            return PSYNC_WRITE_ERROR;
        }
        // 返回等待回复的标识PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数,执行下面的读部分
        return PSYNC_WAIT_REPLY;
    }

当read_reply传0的时候,函数执行写操作,即向master写入一个PSYNC命令,由于本次是第一次向主节点发送同步请求,所以从节点上缓存的主节点状态server.cached_master为空,所以会发送一个PSYNC ? -1,表示进行全量同步

主节点调用syncCommand()方法处理

void syncCommand(client *c) {
    ..........//为了简洁,删除一些判断条件的代码

    // 尝试执行一个部分同步PSYNC的命令,则masterTryPartialResynchronization()会回复一个 "+FULLRESYNC <runid> <offset>",如果失败则执行全量同步
    // 所以,从节点会如果和主节点连接断开,从节点会知道runid和offset,随后会尝试执行PSYNC
    // 如果是执行PSYNC命令
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 主节点尝试执行部分重同步,执行成功返回C_OK
        if (masterTryPartialResynchronization(c) == C_OK) {
            // 可以执行PSYNC命令,则将接受PSYNC命令的个数加1
            server.stat_sync_partial_ok++;
            // 不需要执行后面的全量同步,直接返回
            return; /* No full resync needed, return. */
        // 不能执行PSYNC部分重同步,需要进行全量同步
        } else {
            char *master_runid = c->argv[1]->ptr;
            // 从节点以强制全量同步为目的,所以不能执行部分重同步,因此增加PSYNC命令失败的次数
            if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
    // 执行SYNC命令
    } else {
        // 设置标识,执行SYNC命令,不接受REPLCONF ACK
        c->flags |= CLIENT_PRE_PSYNC;
    }
    // 全量重同步次数加1
    server.stat_sync_full++;

    // 设置client状态为:从服务器节点等待BGSAVE节点的开始
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    // 执行SYNC命令后是否关闭TCP_NODELAY
    if (server.repl_disable_tcp_nodelay)
        // 是的话,则启用nagle算法
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    // 保存主服务器传来的RDB文件的fd,设置为-1
    c->repldbfd = -1;
    // 设置client状态为从节点,标识client是一个从服务器
    c->flags |= CLIENT_SLAVE;
    // 添加到服务器从节点链表中
    listAddNodeTail(server.slaves,c);

    /* CASE 1: BGSAVE is in progress, with disk target. */
    // 情况1. 正在执行 BGSAVE ,且是同步到磁盘上
    if (server.rdb_child_pid != -1 &&
        server.rdb_child_type == RDB_CHILD_TYPE_DISK)
    {
        client *slave;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        // 遍历从节点链表
        while((ln = listNext(&li))) {
            slave = ln->value;
            // 如果有从节点已经创建子进程执行写RDB操作,等待完成,那么退出循环
            // 从节点的状态为 SLAVE_STATE_WAIT_BGSAVE_END 在情况三中被设置
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
        }
        // 对于这个从节点,我们检查它是否具有触发当前BGSAVE操作的能力
        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
            // 将slave的输出缓冲区所有内容拷贝给c的所有输出缓冲区中
            copyClientOutputBuffer(c,slave);
            // 设置全量重同步从节点的状态,设置部分重同步的偏移量
            replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
            serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
        }

    /* CASE 2: BGSAVE is in progress, with socket target. */
    // 情况2. 正在执行BGSAVE,且是无盘同步,直接写到socket中
    } else if (server.rdb_child_pid != -1 &&
               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
    {
        // 虽然有子进程在执行写RDB,但是它直接写到socket中,所以等待下次执行BGSAVE
        serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

    /* CASE 3: There is no BGSAVE is progress. */
    // 情况3:没有执行BGSAVE的进程
    } else {
        // 服务器支持无盘同步
        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
            // 无盘同步复制的子进程被创建在replicationCron()中,因为想等待更多的从节点可以到来而延迟
            if (server.repl_diskless_sync_delay)
                serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
        // 服务器不支持无盘复制
        } else {
            // 如果没有正在执行BGSAVE,且没有进行写AOF文件,则开始为复制执行BGSAVE,并且是将RDB文件写到磁盘上
            if (server.aof_child_pid == -1) {
                startBgsaveForReplication(c->slave_capa);
            } else {
                serverLog(LL_NOTICE,
                    "No BGSAVE in progress, but an AOF rewrite is active. BGSAVE for replication delayed");
            }
        }
    }

    // 只有一个从节点,且backlog为空,则创建一个新的backlog
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}

函数中根据PSYNC命令,执行了部分复制的函数,但是由于这是第一次复制,所以执行是失败的,开始执行全量复制。将从节点的复制状态设置为SLAVE_STATE_WAIT_BGSAVE_START
后续处理分为三个case

这里与主动复制相关的是第三个case,调用了startBgsaveForReplication()函数

// 开始为复制执行BGSAVE,根据配置选择磁盘或套接字作为RDB发送的目标,在开始之前确保冲洗脚本缓存
// mincapa参数是SLAVE_CAPA_*按位与的结果
int startBgsaveForReplication(int mincapa) {
    int retval;
    // 是否直接写到socket
    int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
    listIter li;
    listNode *ln;

    if (socket_target)
        // 直接写到socket中
        // fork一个子进程将rdb写到 状态为等待BGSAVE开始 的从节点的socket中
        retval = rdbSaveToSlavesSockets();
    else
        // 否则后台进行RDB持久化BGSAVE操作,保存到磁盘上
        retval = rdbSaveBackground(server.rdb_filename);

    ......

    // 如果是直接写到socket中,rdbSaveToSlavesSockets()已经会设置从节点为全量复制
    // 否则直接写到磁盘上,执行以下代码
    if (!socket_target) {
        listRewind(server.slaves,&li);
        // 遍历从节点链表
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            // 设置等待全量同步的从节点的状态
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                    // 设置要执行全量重同步从节点的状态
                    replicationSetupSlaveForFullResync(slave,
                            getPsyncInitialOffset());
            }
        }
    }
}

函数中向将每个从节点的状态更改为SLAVE_STATE_WAIT_BGSAVE_START,等待BGSAVE的执行,并且发送了FULLRESYNC命令和master节点的runid,全局复制偏移量server.master_repl_offset

int replicationSetupSlaveForFullResync(client *slave, long long offset) {
    char buf[128];
    int buflen;

    // 设置全量重同步的偏移量
    slave->psync_initial_offset = offset;
    // 设置从节点复制状态,开始累计差异数据
    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
    // 将slaveseldb设置为-1,是为了强制发送一个select命令在复制流中
    server.slaveseldb = -1;

    // 如果从节点的状态是CLIENT_PRE_PSYNC,则表示是Redis是2.8之前的版本,则不将这些信息发送给从节点。
    // 因为在2.8之前只支持SYNC的全量复制同步,而在之后的版本提供了部分的重同步
    if (!(slave->flags & CLIENT_PRE_PSYNC)) {
        buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                          server.runid,offset);
        // 否则会将全量复制的信息写给从节点
        if (write(slave->fd,buf,buflen) != buflen) {
            freeClientAsync(slave);
            return C_ERR;
        }
    }
    return C_OK;
}

从节点上再次回调syncWithMaster()函数,执行

psync_result = slaveTryPartialResynchronization(fd,1);

这一次第二个参数是1,因此会执行函数的读部分

/* Reading half */
// 从主节点读一个命令保存在reply中
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
    // 主节点为了保持连接的状态,可能会在接收到PSYNC命令后发送一个空行
    sdsfree(reply);
    // 所以就返回PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数。
    return PSYNC_WAIT_REPLY;
}
// 如果读到了一个命令,删除fd的可读事件
aeDeleteFileEvent(server.el,fd,AE_READABLE);

// 接受到的是"+FULLRESYNC",表示进行一次全量同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
    char *runid = NULL, *offset = NULL;
    // 解析回复中的内容,将runid和复制偏移量提取出来
    runid = strchr(reply,' ');
    if (runid) {
        runid++;    //定位到runid的地址
        offset = strchr(runid,' ');
        if (offset) offset++;   //定位offset
    }
    // 如果runid和offset任意为空,那么发生不期望错误
    if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
        serverLog(LL_WARNING,"Master replied with wrong +FULLRESYNC syntax.");
        // 将主节点的运行ID重置为0
        memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
    // runid和offset获取成功
    } else {
        // 设置服务器保存的主节点的运行ID
        memcpy(server.repl_master_runid, runid, offset-runid-1);
        server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
        // 主节点的偏移量
        server.repl_master_initial_offset = strtoll(offset,NULL,10);
        serverLog(LL_NOTICE,"Full resync from master: %s:%lld",server.repl_master_runid,          server.repl_master_initial_offset);
    }
    // 执行全量同步,所以缓存的主节点结构没用了,将其清空
    replicationDiscardCachedMaster();
    sdsfree(reply);
    // 返回执行的状态
    return PSYNC_FULLRESYNC;
}

// 接受到的是"+CONTINUE",表示进行一次部分重同步
if (!strncmp(reply,"+CONTINUE",9)) {
    serverLog(LL_NOTICE,"Successful partial resynchronization with master.");
    sdsfree(reply);
    // 因为执行部分重同步,因此要使用缓存的主节点结构,所以将其设置为当前的主节点,被同步的主节点
    replicationResurrectCachedMaster(fd);
    // 返回执行的状态
    return PSYNC_CONTINUE;
}

// 接收到了错误,两种情况。
// 1. 主节点不支持PSYNC命令,Redis版本低于2.8
// 2. 从主节点读取了一个不期望的回复
if (strncmp(reply,"-ERR",4)) {
    /* If it's not an error, log the unexpected event. */
    serverLog(LL_WARNING,"Unexpected reply to PSYNC from master: %s", reply);
} else {
    serverLog(LL_NOTICE,"Master does not support PSYNC or is in error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
// 发送不支持PSYNC命令的状态
return PSYNC_NOT_SUPPORTED;

函数中处理了master节点发来的三种响应

// 执行到这里,psync_result == PSYNC_FULLRESYNC或PSYNC_NOT_SUPPORTED
    // 准备一个合适临时文件用来写入和保存主节点传来的RDB文件数据
    while(maxtries--) {
        // 设置文件的名字
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        // 以读写,可执行权限打开临时文件
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        // 打开成功,跳出循环
        if (dfd != -1) break;
        sleep(1);
    }
    /* Setup the non blocking download of the bulk file. */
    // 监听一个fd的读事件,并设置该事件的处理程序为readSyncBulkPayload
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        serverLog(LL_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }

    // 复制状态为正从主节点接受RDB文件
    server.repl_state = REPL_STATE_TRANSFER;
    // 初始化RDB文件的大小
    server.repl_transfer_size = -1;
    // 已读的大小
    server.repl_transfer_read = 0;
    // 最近一个执行fsync的偏移量为0
    server.repl_transfer_last_fsync_off = 0;
    // 传输RDB文件的临时fd
    server.repl_transfer_fd = dfd;
    // 最近一次读到RDB文件内容的时间
    server.repl_transfer_lastio = server.unixtime;
    // 保存RDB文件的临时文件名
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;

函数中打开了一个临时文件用于保存主节点发来的RDB文件数据,监听fd上的读事件,使用readSyncBulkPayload()函数处理
这个函数处理比较复杂,这里就不贴代码了,大概操作是将主节点发来的EOF流数据读到一个缓冲区,然后将缓冲区数据写到打开的临时文件中,接着将文件载入从节点的数据库中,同步到磁盘上。
最后设置复制的状态为REPL_STATE_TRANSFER,并初始化复制的相关信息,RDB文件大小,偏移量等

主节点在回复完FULLRESYNC之后就没有再做操作,RDB文件的发送也是依赖时间事件触发的,调用过程如下

serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave()
/* This function is called at the end of every background saving,
 * or when the replication RDB transfer strategy is modified from
 * disk to socket or the other way around.
 *
 * The goal of this function is to handle slaves waiting for a successful
 * background saving in order to perform non-blocking synchronization, and
 * to schedule a new BGSAVE if there are slaves that attached while a
 * BGSAVE was in progress, but it was not a good one for replication (no
 * other slave was accumulating differences).
 *
 * The argument bgsaveerr is C_OK if the background saving succeeded
 * otherwise C_ERR is passed to the function.
 * The 'type' argument is the type of the child that terminated
 * (if it had a disk or socket target). */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
    listNode *ln;
    int startbgsave = 0;
    int mincapa = -1;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
            startbgsave = 1;
            mincapa = (mincapa == -1) ? slave->slave_capa :
                                        (mincapa & slave->slave_capa);
        } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
            struct redis_stat buf;

            /* If this was an RDB on disk save, we have to prepare to send
             * the RDB from disk to the slave socket. Otherwise if this was
             * already an RDB -> Slaves socket transfer, used in the case of
             * diskless replication, our work is trivial, we can just put
             * the slave online. */
            if (type == RDB_CHILD_TYPE_SOCKET) {
                serverLog(LL_NOTICE,
                    "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
                        replicationGetSlaveName(slave));
                /* Note: we wait for a REPLCONF ACK message from slave in
                 * order to really put it online (install the write handler
                 * so that the accumulated data can be transferred). However
                 * we change the replication state ASAP, since our slave
                 * is technically online now. */
                slave->replstate = SLAVE_STATE_ONLINE;
                slave->repl_put_online_on_ack = 1;
                slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
            } else {
                if (bgsaveerr != C_OK) {
                    freeClient(slave);
                    serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
                    continue;
                }
                if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                    redis_fstat(slave->repldbfd,&buf) == -1) {
                    freeClient(slave);
                    serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                    continue;
                }
                slave->repldboff = 0;
                slave->repldbsize = buf.st_size;
                slave->replstate = SLAVE_STATE_SEND_BULK;
                slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                    (unsigned long long) slave->repldbsize);

                aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
                if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                    freeClient(slave);
                    continue;
                }
            }
        }
    }
    if (startbgsave) startBgsaveForReplication(mincapa);
}

函数中只读的打开临时RDB文件,设置从节点client的复制状态为SLAVE_STATE_SEND_BULK
创建写事件,并设置sendBulkToSlave()为事件处理程序
当主节点周期执行时,会先清除之前监听的写事件,然后立即绑定新的写事件,触发处理函数将RDB文件写到fd上,从节点调用readSyncBulkPayload()函数来处理接收到的RDB文件

void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *slave = privdata;
    UNUSED(el);
    UNUSED(mask);
    char buf[PROTO_IOBUF_LEN];
    ssize_t nwritten, buflen;

    /* Before sending the RDB file, we send the preamble as configured by the
     * replication process. Currently the preamble is just the bulk count of
     * the file in the form "$<length>\r\n". */
    if (slave->replpreamble) {
        nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
        if (nwritten == -1) {
            serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
                strerror(errno));
            freeClient(slave);
            return;
        }
        server.stat_net_output_bytes += nwritten;
        sdsrange(slave->replpreamble,nwritten,-1);
        if (sdslen(slave->replpreamble) == 0) {
            sdsfree(slave->replpreamble);
            slave->replpreamble = NULL;
            /* fall through sending data. */
        } else {
            return;
        }
    }

    /* If the preamble was already transferred, send the RDB bulk data. */
    lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
    buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
    if (buflen <= 0) {
        serverLog(LL_WARNING,"Read error sending DB to replica: %s",
            (buflen == 0) ? "premature EOF" : strerror(errno));
        freeClient(slave);
        return;
    }
    if ((nwritten = write(fd,buf,buflen)) == -1) {
        if (errno != EAGAIN) {
            serverLog(LL_WARNING,"Write error sending DB to replica: %s",
                strerror(errno));
            freeClient(slave);
        }
        return;
    }
    slave->repldboff += nwritten;
    server.stat_net_output_bytes += nwritten;
    if (slave->repldboff == slave->repldbsize) {
        close(slave->repldbfd);
        slave->repldbfd = -1;
        aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
        putSlaveOnline(slave);
    }
}

这个函数将RDB文件的大小写给从节点,从RDB文件的repldbfd中读出文件数据,写到fd上,写入完成后,清除可写事件标记,等待下一次发送缓冲区数据时再监听触发,并调用putSlaveOnline()函数将从节点的复制状态置为SLAVE_STATE_ONLINE,至此RDB文件发送完成,准备发送缓存更新

发送输出缓冲区数据

在发送完RDB文件之后,调用putSlaveOnline(),会创建一个新事件,监听写事件发生,设置sendReplyToClient()函数为处理程序,并且会将从节点的client对象当作私有数据传入函数,当作发送缓冲区对象,最后调用writeToClient()函数将client中的缓冲区数据发送到从节点上,保证主从服务器的数据库状态一致

/* Write data in output buffers to client. Return C_OK if the client
 * is still valid after the call, C_ERR if it was freed. */
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    clientReplyBlock *o;

    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;

            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }

            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                /* If there are no longer objects in the list, we expect
                 * the count of reply bytes to be exactly zero. */
                if (listLength(c->reply) == 0)
                    serverAssert(c->reply_bytes == 0);
            }
        }
        /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interface).
         *
         * However if we are over the maxmemory limit we ignore that and
         * just deliver as much data as it is possible to deliver.
         *
         * Moreover, we also send as much as possible if the client is
         * a slave (otherwise, on high-speed traffic, the replication
         * buffer will grow indefinitely) */
        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory) &&
            !(c->flags & CLIENT_SLAVE)) break;
    }
    server.stat_net_output_bytes += totwritten;
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            serverLog(LL_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return C_ERR;
        }
    }
    if (totwritten > 0) {
        /* For clients representing masters we don't count sending data
         * as an interaction, since we always send REPLCONF ACK commands
         * that take some time to just fill the socket output buffer.
         * We just rely on data / pings received for timeout detection. */
        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
    }
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

        /* Close connection after entire reply has been sent. */
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }
    }
    return C_OK;
}

命令传播

在将RDB文件发送到从节点后,就开始正式的同步复制了,所有发送到主节点上执行的命令都会发送到从节点上执行。调用的函数是replicationFeedSlaves()

// 将参数列表中的参数发送给从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[LONG_STR_SIZE];

    // 如果没有backlog且没有从节点服务器,直接返回
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

    /* We can't have slaves attached and no backlog. */
    serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    // 如果当前从节点使用的数据库不是目标的数据库,则要生成一个select命令
    if (server.slaveseldb != dictid) {
        robj *selectcmd;

        // 0 <= id < 10 ,可以使用共享的select命令对象
        if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        // 否则自行按照协议格式构建select命令对象
        } else {
            int dictid_len;

            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(OBJ_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }
        // 将select 命令添加到backlog中
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

        // 发送给从服务器
        listRewind(slaves,&li);
        // 遍历所有的从服务器节点
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            // 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
            // 添加select命令到当前从节点的回复中
            addReply(slave,selectcmd);
        }
        // 释放临时对象
        if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }
    // 设置当前从节点使用的数据库ID
    server.slaveseldb = dictid;

    // 将命令写到backlog中
    if (server.repl_backlog) {
        char aux[LONG_STR_SIZE+3];

        // 将参数个数构建成协议标准的字符串
        // *<argc>\r\n
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        // 添加到backlog中
        feedReplicationBacklog(aux,len+3);

        // 遍历所有的参数
        for (j = 0; j < argc; j++) {
            // 返回参数对象的长度
            long objlen = stringObjectLen(argv[j]);

            // 构建成协议标准的字符串,并添加到backlog中
            // $<len>\r\n<argv>\r\n
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            // 添加$<len>\r\n
            feedReplicationBacklog(aux,len+3);
            // 添加参数对象<argv>
            feedReplicationBacklogWithObject(argv[j]);
            // 添加\r\n
            feedReplicationBacklog(aux+len+1,2);
        }
    }
    // 将命令写到每一个从节点中
    listRewind(server.slaves,&li);
    // 遍历从节点链表
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        // 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

        // 将命令写给正在等待初次SYNC的从节点(所以这些命令在输出缓冲区中排队,直到初始SYNC完成),或已经与主节点同步
        /* Add the multi bulk length. */
        // 添加回复的长度
        addReplyMultiBulkLen(slave,argc);

        // 将所有的参数列表添加到从节点的输出缓冲区
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

注意:

部分重同步实现

上述讨论的是全量同步的处理流程,但是在实际使用中可能会出现网络问题,导致连接中断,这时就需要部分重同步的机制来保证数据的可靠性

心跳机制

主从节点建立连接之后,会维护一个长连接,发送心跳命令,主节点默认每隔10秒发送PING命令确认从节点连接状态,配置项repl-ping-salve-period

// 首先,根据当前节点发送PING命令给从节点的频率发送PING命令 
// 如果当前节点是某以节点的 主节点 ,那么发送PING给从节点
if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
    // 创建PING命令对象
    ping_argv[0] = createStringObject("PING",4);
    // 将PING发送给从服务器
    replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
    decrRefCount(ping_argv[0]);
}

从节点在主线程中默认每隔1秒发送一次REPLCONF ACK <offset>命令,给主节点报告自己当前的复制偏移量

// 定期发送ack给主节点,旧版本的Redis除外
if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC))
    // 发送一个REPLCONF ACK命令给主节点去报告关于当前处理的offset。
    replicationSendAck();

在定时任务replicationCron()中每一次都会检查从节点和主节点的交互是否超时,如果超时则断开连接,等到下一个周期再和主节点重新建立连接,进行复制

复制积压缓冲区(backlog)

复制积压缓冲区是一个默认1M大小的循环队列,在主节点命令传播时,命令也会写到这个缓冲队列当中,但是最大只有1M,如果断开连接的时间较长,积压的命令数据超过1M,则会将旧数据丢失掉,所以在从节点再次连上时,就无法执行部分重同步,必须执行全量同步

在连接时调用的是上面提到过的syncCommand()函数,其中调用masterTryPartialResynchronization()函数来尝试执行部分重同步

// 该函数从主节点接收到部分重新同步请求的角度处理PSYNC命令
// 成功返回C_OK,否则返回C_ERR
int masterTryPartialResynchronization(client *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;   //主节点的运行ID
    char buf[128];
    int buflen;

    // 主节点的运行ID是否和从节点执行PSYNC的参数提供的运行ID相同。
    // 如果运行ID发生了改变,则主节点是一个不同的实例,那么就不能进行继续执行原有的复制进程
    if (strcasecmp(master_runid, server.runid)) {
        /* Run id "?" is used by slaves that want to force a full resync. */
        // 如果从节点的运行ID是"?",表示想要强制进行一个全量同步
        if (master_runid[0] != '?') {
            serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
                master_runid, server.runid);
        } else {
            serverLog(LL_NOTICE,"Full resync requested by slave %s",
                replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }

    // 从参数对象中获取psync_offset
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
       C_OK) goto need_full_resync;
    // 如果psync_offset小于repl_backlog_off,说明backlog所备份的数据的已经太新了,有一些数据被覆盖,则需要进行全量复制
    // 如果psync_offset大于(server.repl_backlog_off + server.repl_backlog_histlen),表示当前backlog的数据不够全,则需要进行全量复制
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
    {
        serverLog(LL_NOTICE,
            "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) {
            serverLog(LL_WARNING,
                "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }

    // 执行到这里,则可以进行部分重同步
    // 1. 设置client状态为从节点
    // 2. 向从节点发送 +CONTINUE 表示接受 partial resync 被接受
    // 3. 发送backlog的数据给从节点

    // 设置client状态为从节点
    c->flags |= CLIENT_SLAVE;
    // 设置复制状态为在线,此时RDB文件传输完成,发送差异数据
    c->replstate = SLAVE_STATE_ONLINE;
    // 设置从节点收到ack的时间
    c->repl_ack_time = server.unixtime;
    // slave向master发送ack标志设置为0
    c->repl_put_online_on_ack = 0;
    // 将当前client加入到从节点链表中
    listAddNodeTail(server.slaves,c);
    // 向从节点发送 +CONTINUE
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return C_OK;
    }
    // 将backlog的数据发送从节点
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    serverLog(LL_NOTICE,
        "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset);
    // 计算延迟值小于min-slaves-max-lag的从节点的个数
    refreshGoodSlavesCount();
    return C_OK; /* The caller can return, no full resync needed. */

need_full_resync:
    return C_ERR;
}

如果可以进行部分重同步,则主节点会返回"+CONTINUE\r\n"给从节点,然后调用addReplyReplicationBacklog()函数, 这个函数就是将backlog中的数据发送到从节点

注意:
在部分重同步过程中全量同步前面的连接过程也全都会再走一遍,只是在从节点发送PSYNC时,主节点返回的不再是+FULLRESYNC而是+CONTINUE

上一篇 下一篇

猜你喜欢

热点阅读