Redis客户端与服务器

2018-05-03  本文已影响0人  涵仔睡觉

客户端

Redis服务器状态结构的clients属性是一个链表,这个链表保存了所有与服务器连接的客户端状态结构:

image
/* 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。多个客户端状态被服务器用链表连接起来。 */
typedef struct redisClient {
    int fd;    // 套接字描述符
    redisDb *db;    // 当前正在使用的数据库
    int dictid; // 当前正在使用的数据库的 id (号码)   
    robj *name;      // 客户端的名字
    sds querybuf;  // 查询缓冲区
    size_t querybuf_peak; // 查询缓冲区长度峰值
    int argc; // 参数数量
    robj **argv; // 参数对象数组
    struct redisCommand *cmd, *lastcmd; // 记录被客户端执行的命令
    int reqtype;// 请求的类型:内联命令还是多条命令
    int multibulklen;    // 剩余未读取的命令内容数量
    long bulklen;      // 命令内容的长度
    list *reply;  // 回复链表
    unsigned long reply_bytes;  // 回复链表中对象的总大小
    int sentlen;      // 已发送字节,处理 short write 用      
    time_t ctime;        // 创建客户端的时间
    time_t lastinteraction;     // 客户端最后一次和服务器互动的时间
    time_t obuf_soft_limit_reached_time;   // 客户端的输出缓冲区超过软性限制的时间
    int flags;       // 客户端状态标志
    int authenticated;     // 当 server.requirepass 不为 NULL 时代表认证的状态:0 代表未认证, 1 代表已认证
    int replstate;  // 复制状态
    int repldbfd;   // 用于保存主服务器传来的 RDB 文件的文件描述符
    off_t repldboff;  // 读取主服务器传来的 RDB 文件的偏移量
    off_t repldbsize;      // 主服务器传来的 RDB 文件的大小
    sds replpreamble;       /* replication DB preamble. */
    long long reploff;  // 主服务器的复制偏移量
    long long repl_ack_off;  // 从服务器最后一次发送 REPLCONF ACK 时的偏移量
    long long repl_ack_time;  // 从服务器最后一次发送 REPLCONF ACK 的时间
    char replrunid[REDIS_RUN_ID_SIZE+1];  // 主服务器的 master run ID,保存在客户端,用于执行部分重同步
    int slave_listening_port;  // 从服务器的监听端口号
    multiState mstate;     // 事务状态
    int btype;   // 阻塞类型
    blockingState bpop; // 阻塞状态
    long long woff; // 最后被写入的全局复制偏移量
    list *watched_keys;   // 被监视的键
    dict *pubsub_channels;  // 这个字典记录了客户端所有订阅的频道,键为频道名字,值为 NULL,也即是,一个频道的集合
    list *pubsub_patterns;      // 链表,包含多个 pubsubPattern 结构,记录了所有订阅频道的客户端的信息,新 pubsubPattern 结构总是被添加到表尾
    sds peerid;             /* Cached peer ID. */

    /* Response buffer */
    int bufpos;  // 回复偏移量
    char buf[REDIS_REPLY_CHUNK_BYTES]; // 回复缓冲区
} redisClient;
  1. fd:套接字描述符
  1. name:客户端名称,默认情况下没有名字,可用setname命令设置
  2. flags:标志客户端角色以及当前状态
  3. querybuf:输入缓冲区,其大小根据输入内容动态伸缩,但最大不超过1GB,否则服务器将关闭这个客户端
  4. argc、argv:命令参数个数以及命令参数(数组,元素为字符串对象),argv[0]是要执行的命令
  5. cmd:有一个命令表,该表是一个字典,键是一个SDS结构,保存了命令的名字,值是对应的redisCommand结构,保存了命令的实现函数、命令的标志、参数个数、执行次数和消耗时长等信息。根据argv[0],可以从该命令表中找到对应的redisCommand结构,将cmd指向该结构。
  6. 输出缓冲区
  1. authenticated:身份验证,0表示未通过验证,1表示已通过验证
  2. ctime、lastinteraction、obuf_soft_limit_reached_time:时间属性,分别表示客户端创建时间、最后一次跟服务器互动时间以及客户端空转时间。

服务器

Redis服务器负责与多个客户端建立网络连接,处理客户端发送的命令请求,在数据库中保存客户端执行命令所产生的数据,并通过资源管理来维持服务器自身的运转。

struct redisServer {
    /* General */    
    char *configfile;       // 配置文件的绝对路径
    int hz;           // serverCron() 每秒调用的次数
    redisDb *db; // 数据库
    dict *commands;    // 命令表(受到 rename 配置选项的作用)
    dict *orig_commands;     // 命令表(无 rename 配置选项的作用)    
    aeEventLoop *el;// 事件状态
    unsigned lruclock:REDIS_LRU_BITS;   // 最近一次使用时钟
    int shutdown_asap;    // 关闭服务器的标识
    int activerehashing;    // 在执行 serverCron() 时进行渐进式 rehash
    char *requirepass;  // 是否设置了密码
    char *pidfile;  // PID 文件
    int arch_bits;     // 架构类型
    int cronloops;  // serverCron() 函数的运行次数计数器
    char runid[REDIS_RUN_ID_SIZE+1];   // 本服务器的 RUN ID 
    int sentinel_mode;       // 服务器是否运行在 SENTINEL 模式

    /* Networking */
    int port;    // TCP 监听端口
    int tcp_backlog;            /* TCP listen() backlog */
    char *bindaddr[REDIS_BINDADDR_MAX];   // 地址
    int bindaddr_count;     // 地址数量
    char *unixsocket;       // UNIX 套接字
    mode_t unixsocketperm;      /* UNIX socket permission */
    int ipfd[REDIS_BINDADDR_MAX];  // 描述符
    int ipfd_count;     // 描述符数量
    int sofd;     // UNIX 套接字文件描述符
    int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
    int cfd_count;              /* Used slots in cfd[] */
    list *clients;    // 一个链表,保存了所有客户端状态结构
    list *clients_to_close;   // 链表,保存了所有待关闭的客户端
    list *slaves, *monitors;   // 链表,保存了所有从服务器,以及所有监视器
    redisClient *current_client;    // 服务器的当前客户端,仅用于崩溃报告
    int clients_paused;         /* True if clients are currently paused */
    mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
    char neterr[ANET_ERR_LEN];     // 网络错误
    dict *migrate_cached_sockets; // MIGRATE 缓存

    /* RDB / AOF loading information */
    int loading;     // 这个值为真时,表示服务器正在进行载入
    off_t loading_total_bytes;     // 正在载入的数据的大小
    off_t loading_loaded_bytes;    // 已载入数据的大小
    time_t loading_start_time;  // 开始进行载入的时间
    off_t loading_process_events_interval_bytes;

    /* Fast pointers to often looked up command */
    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
                        *rpopCommand;     // 常用命令的快捷连接

    /* Fields used only for stats */
    time_t stat_starttime;     // 服务器启动时间
    long long stat_numcommands;     // 已处理命令的数量
    long long stat_numconnections;    // 服务器接到的连接请求数量
    long long stat_expiredkeys;  // 已过期的键数量
    long long stat_evictedkeys;    // 因为回收内存而被释放的过期键的数量
    long long stat_keyspace_hits;  // 成功查找键的次数
    long long stat_keyspace_misses;    // 查找键失败的次数
    size_t stat_peak_memory;   // 已使用内存峰值
    long long stat_fork_time;     // 最后一次执行 fork() 时消耗的时间
    long long stat_rejected_conn;    // 服务器因为客户端数量过多而拒绝客户端连接的次数
    long long stat_sync_full;  // 执行 full sync 的次数
    long long stat_sync_partial_ok;    // PSYNC 成功执行的次数
    long long stat_sync_partial_err;  // PSYNC 执行失败的次数

    /* slowlog */
    list *slowlog; // 保存了所有慢查询日志的链表
    long long slowlog_entry_id;     // 下一条慢查询日志的 ID
    long long slowlog_log_slower_than; // 服务器配置 slowlog-log-slower-than 选项的值
    unsigned long slowlog_max_len;    // 服务器配置 slowlog-max-len 选项的值
    size_t resident_set_size;       /* RSS sampled in serverCron(). */

    /* The following two are used to track instantaneous "load" in terms
     * of operations per second. */
    long long ops_sec_last_sample_time;   // 最后一次进行抽样的时间
    long long ops_sec_last_sample_ops;   // 最后一次抽样时,服务器已执行命令的数量
    long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];  // 抽样结果
    int ops_sec_idx; // 数组索引,用于保存抽样结果,并在需要时回绕到 0

    /* Configuration */
    int verbosity;      // 日志可见性
    int maxidletime;           // 客户端最大空转时间
    int tcpkeepalive;       // 是否开启 SO_KEEPALIVE 选项
    int active_expire_enabled;      /* Can be disabled for testing purposes. */
    size_t client_max_querybuf_len; /* Limit for client query buffer length */
    int dbnum;                      /* Total number of configured DBs */
    int daemonize;                  /* True if running as a daemon */
    clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
    // 客户端输出缓冲区大小限制
    // 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个
    // 每个代表一类客户端:普通、从服务器、pubsub,诸如此类

    /* AOF persistence */
    int aof_state;    // AOF 状态(开启/关闭/可写)
    int aof_fsync;       // 所使用的 fsync 策略(每个写入/每秒/从不) 
    char *aof_filename;             /* Name of the AOF file */
    int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
    off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
    off_t aof_rewrite_base_size;  // 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小
    off_t aof_current_size;   // AOF 文件的当前字节大小
    int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
    pid_t aof_child_pid;     // 负责进行 AOF 重写的子进程 ID
    list *aof_rewrite_buf_blocks;   // AOF 重写缓存链表,链接着多个缓存块
    sds aof_buf;       // AOF 缓冲区
    int aof_fd;       // AOF 文件的描述符
    int aof_selected_db;  // AOF 的当前目标数据库
    time_t aof_flush_postponed_start;  // 推迟 write 操作的时间
    time_t aof_last_fsync;     // 最后一直执行 fsync 的时间
    time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
    time_t aof_rewrite_time_start;    // AOF 重写的开始时间
    int aof_lastbgrewrite_status;   // 最后一次执行 BGREWRITEAOF 的结果
    unsigned long aof_delayed_fsync;   // 记录 AOF 的 write 操作被推迟了多少次
    int aof_rewrite_incremental_fsync;  // 指示是否需要每写入一定量的数据,就主动执行一次 fsync()
    int aof_last_write_status;      /* REDIS_OK or REDIS_ERR */
    int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */

    /* RDB persistence */
    long long dirty;      // 自从上次 SAVE 执行以来,数据库被修改的次数
    long long dirty_before_bgsave;   // BGSAVE 执行前的数据库被修改次数
    pid_t rdb_child_pid;    // 负责执行 BGSAVE 的子进程的 ID,没在执行 BGSAVE 时,设为 -1
    struct saveparam *saveparams;   /* Save points array for RDB */
    int saveparamslen;              /* Number of saving points */
    char *rdb_filename;             /* Name of RDB file */
    int rdb_compression;            /* Use compression in RDB? */
    int rdb_checksum;               /* Use RDB checksum? */
    time_t lastsave;       // 最后一次完成 SAVE 的时间
    time_t lastbgsave_try;        // 最后一次尝试执行 BGSAVE 的时间
    time_t rdb_save_time_last;        // 最近一次 BGSAVE 执行耗费的时间
    time_t rdb_save_time_start;       // 数据库最近一次开始执行 BGSAVE 的时间
    int lastbgsave_status;          // 最后一次执行 SAVE 的状态
    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */

    /* Propagation of commands in AOF / replication */
    redisOpArray also_propagate;    /* Additional command to propagate. */

    /* Logging */
    char *logfile;                  /* Path of log file */
    int syslog_enabled;             /* Is syslog enabled? */
    char *syslog_ident;             /* Syslog ident */
    int syslog_facility;            /* Syslog facility */

    /* Replication (master) */
    int slaveseldb;                 /* Last SELECTed DB in replication output */
    long long master_repl_offset;      // 全局复制偏移量(一个累计值)
    int repl_ping_slave_period;   // 主服务器发送 PING 的频率
    char *repl_backlog;      // backlog 本身
    long long repl_backlog_size;       // backlog 的长度
    long long repl_backlog_histlen;   // backlog 中数据的长度
    long long repl_backlog_idx;      // backlog 的当前索引
    long long repl_backlog_off;      // backlog 中可以被还原的第一个字节的偏移量
    time_t repl_backlog_time_limit;   // backlog 的过期时间
    time_t repl_no_slaves_since;     // 距离上一次有从服务器的时间,是否开启最小数量从服务器写入功能
    int repl_min_slaves_to_write;   /* Min number of slaves to write. */
    int repl_min_slaves_max_lag;    // 定义最小数量从服务器的最大延迟值
    int repl_good_slaves_count;   // 延迟良好的从服务器的数量

    /* Replication (slave) */
    char *masterauth;        // 主服务器的验证密码
    char *masterhost;     // 主服务器的地址
    int masterport;       // 主服务器的端口
    int repl_timeout;       // 超时时间
    redisClient *master;     // 主服务器所对应的客户端
    redisClient *cached_master;   // 被缓存的主服务器,PSYNC 时使用
    int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
    int repl_state;        // 复制的状态(服务器是从服务器时使用)
    off_t repl_transfer_size;    // RDB 文件的大小*/
    off_t repl_transfer_read;     // 已读 RDB 文件内容的字节数
    off_t repl_transfer_last_fsync_off;     // 最近一次执行 fsync 时的偏移量,用于 sync_file_range 函数
    int repl_transfer_s;    // 主服务器的套接字
    int repl_transfer_fd;  // 保存 RDB 文件的临时文件的描述符
    char *repl_transfer_tmpfile;    // 保存 RDB 文件的临时文件名字
    time_t repl_transfer_lastio;     // 最近一次读入 RDB 内容的时间
    int repl_serve_stale_data; /* Serve stale data when link is down? */
    int repl_slave_ro;      // 是否只读从服务器?
    time_t repl_down_since;    // 连接断开的时长
    int repl_disable_tcp_nodelay;     // 是否要在 SYNC 之后关闭 NODELAY ?
    int slave_priority;     // 从服务器优先级
    char repl_master_runid[REDIS_RUN_ID_SIZE+1];   // 本服务器(从服务器)当前主服务器的 RUN ID
    long long repl_master_initial_offset;   // 初始化偏移量

    /* Replication script cache. */
    dict *repl_scriptcache_dict;        /* SHA1 all slaves are aware of. */
    list *repl_scriptcache_fifo;    // FIFO 队列
    int repl_scriptcache_size;      // 缓存的大小

    /* Synchronous replication. */
    list *clients_waiting_acks;         /* Clients waiting in WAIT command. */
    int get_ack_from_slaves;            /* If true we send REPLCONF GETACK. */
    /* Limits */
    int maxclients;                 /* Max number of simultaneous clients */
    unsigned long long maxmemory;   /* Max number of memory bytes to use */
    int maxmemory_policy;           /* Policy for key eviction */
    int maxmemory_samples;          /* Pricision of random sampling */

    /* Blocked clients */
    unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
    list *unblocked_clients; /* list of clients to unblock before next loop */
    list *ready_keys;        /* List of readyList structures for BLPOP & co */

    /* Sort parameters - qsort_r() is only available under BSD so we
     * have to take this state global, in order to pass it to sortCompare() */
    int sort_desc;
    int sort_alpha;
    int sort_bypattern;
    int sort_store;

    /* Zip structure config, see redis.conf for more information  */
    size_t hash_max_ziplist_entries;
    size_t hash_max_ziplist_value;
    size_t list_max_ziplist_entries;
    size_t list_max_ziplist_value;
    size_t set_max_intset_entries;
    size_t zset_max_ziplist_entries;
    size_t zset_max_ziplist_value;
    size_t hll_sparse_max_bytes;
    time_t unixtime;        /* Unix time sampled every cron cycle. */
    long long mstime;       /* Like 'unixtime' but with milliseconds resolution. */


    /* Pubsub */
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    // 字典,键为频道,值为链表
    // 链表中保存了所有订阅某个频道的客户端
    // 新客户端总是被添加到链表的表尾
    list *pubsub_patterns;   // 这个链表记录了客户端订阅的所有模式的名字
    int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
                                   xor of REDIS_NOTIFY... flags. */

    /* Cluster */
    int cluster_enabled;      /* Is cluster enabled? */
    mstime_t cluster_node_timeout; /* Cluster node timeout. */
    char *cluster_configfile; /* Cluster auto-generated config file name. */
    struct clusterState *cluster;  /* State of the cluster */
    int cluster_migration_barrier; /* Cluster replicas migration barrier. */
  
    /* Scripting */
    lua_State *lua;   // Lua 环境
    redisClient *lua_client;   // 复制执行 Lua 脚本中的 Redis 命令的伪客户端
    redisClient *lua_caller;     // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL
    dict *lua_scripts;     // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和
    mstime_t lua_time_limit;     // Lua 脚本的执行时限
    mstime_t lua_time_start;   // 脚本开始执行的时间
    int lua_write_dirty;  // 脚本是否执行过写命令
    int lua_random_dirty; // 脚本是否执行过带有随机性质的命令
    int lua_timedout;      // 脚本是否超时
    int lua_kill;      // 是否要杀死脚本

    /* Assert & bug reporting */
    char *assert_failed;
    char *assert_file;
    int assert_line;
    int bug_report_start; /* True if bug report header was already logged. */
    int watchdog_period;  /* Software watchdog period in ms. 0 = off */
};

命令执行过程:

  1. 客户端向服务器发送命令请求;
  2. 服务器接收并处理客户端发来的命令请求,在数据库中进行设置操作,并产生命令回复;
  3. 服务器将命令回复发送给客户端;
  4. 客户端接收服务器返回的命令回复,并将这个回复打印给用户。

1、发送命令请求
客户端将命令转换成协议格式,并通过连接到服务器的套接字发送给服务器。

image

2、读取命令请求
服务器将命令请求读到输入缓冲区中,并对缓冲区中的请求进行分析,提取命令请求中的参数、参数个数,保存到argv和argc中。

image

接着调用命令执行器:
(1)查找命令实现
根据客户端状态的argv[0]参数(命令名),在命令表中查询所指定的命令,并将命令保存到客户端状态结构的cmd属性中。

image

命令表示一个字典,键为命令名字,值是一个redisCommand结构:


image

sflags是标志符,功能如下:


image

(2)执行预备操作
在执行真正的命令之前,还需要做一些预备操作,确保命令可以正确的运行:

(3)调用命令实现函数
cmd属性指向的结构中包含了命令实现函数:


image

调用该命令实现函数,并产生相应的命令回复,保存在输出缓冲区中,再通过与客户端相连的套接字返回给客户端。

(4)执行后续工作

上一篇下一篇

猜你喜欢

热点阅读