redis网络连接

2018-12-16  本文已影响0人  剑雪封侯之

客户端定义

redis使用了一对多服务器程序:一个服务器可以与多个客户端进行网络连接,向多个客户端提供服务。服务端为客户端建立的相应的客户端结构来保存客户端的相关信息,位于文件redis.h中。

typedef struct redisClient {
    uint64_t id;            /* Client incremental unique ID. */
    //客户端的套接字,伪客户端的为-1(AOF,Lua脚本),其它为普通客户端
    int fd;
    //客户端正在使用的redis数据库指针
    redisDb *db;
    int dictid;
    //客户端的名字,使用client setname 来设置名称
    robj *name;             /* As set by CLIENT SETNAME */
    //输入缓存区
    sds querybuf;
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */
    //将querybuf中解析完成后,放入argv中,个数放于argc中
    int argc;
    robj **argv;
    //redis 命令的实现函数
    struct redisCommand *cmd, *lastcmd;
    int reqtype;
    int multibulklen;       /* number of multi bulk arguments left to read */
    long bulklen;           /* length of bulk argument in multi bulk request */
    //回复缓冲区,用来保存回复比较大的内容,使用链表来保存
    list *reply;
    unsigned long reply_bytes; /* Tot bytes of objects in reply list */
    int sentlen;            /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    //创建客户端的时间
    time_t ctime;           /* Client creation time */
    //最后一次交互时间,可以用来计算该连接的空转时间
    time_t lastinteraction; /* time of the last interaction, used for timeout */
    //缓冲区第一次达到软性限制的时间
    time_t obuf_soft_limit_reached_time;
    //redis标志位
    int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
    //身份验证,0表示未通过身份验证,1表示通过身份验证。仅在服务端启用了安全验证时才用,若启用
    //未通过验证,则只能使用AUTH命令,其它均无法使用
    int authenticated;      /* when requirepass is non-NULL */
    int replstate;          /* replication state if this is a slave */
    int repldbfd;           /* replication DB file descriptor */
    off_t repldboff;        /* replication DB file offset */
    off_t repldbsize;       /* replication DB file size */
    long long reploff;      /* replication offset if this is our master */
    long long repl_ack_off; /* replication ack offset, if this is a slave */
    long long repl_ack_time;/* replication ack time, if this is a slave */
    char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
    int slave_listening_port; /* As configured with: SLAVECONF listening-port */
    multiState mstate;      /* MULTI/EXEC state */
    blockingState bpop;   /* blocking state */
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    sds peerid;             /* Cached peer ID. */

    /* Response buffer */
    //输出缓冲区,固定大小的大小,用来保存长度比较小的回复
    int bufpos;
    //默认大小为16k
    char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;

服务端接受客户端连接

void initServer(void) {
   ...
    //创建evenloop
   server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
    ...
    //创建socket监听套接字
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);
    ...
    //增加网络套接字的监听,收到可读事件后,接受连接
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
   ... 
}

//接受tcp连接
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    while(max--) {
        //接受连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        //根据fd生成客户端结构
        acceptCommonHandler(cfd,0);
    }
}
//接受本地套接字的连接
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd, max = MAX_ACCEPTS_PER_CALL;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    while(max--) {
        cfd = anetUnixAccept(server.neterr, fd);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
        acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
    }
}

//根据连接的文件描述符创建一个客户端
static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in non-blocking
     * mode and we can send an error for free using the Kernel I/O */
     //如果超过最大的客户端个数,则向客户端发送错误原因,同时关闭该连接
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
    c->flags |= flags;
}

上一篇 下一篇

猜你喜欢

热点阅读