t_zset.c
2019-04-18 本文已影响0人
生命就是个Bug
Redis的t_zset.c
是对zset
数据结构的实现。
zset
是由 dict 和zskiplist
来实现的。
当元素较少的时候,采用ziplist
来实现zset
。
当元素较多的时候,采用skiplist
来实现zset
。
在redis.conf
中存在如下配置:
# Similarly to hashes and lists, sorted sets are also specially encoded in
# order to save a lot of space. This encoding is only used when the length and
# elements of a sorted set are below the following limits:
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
定义了ziplist
转为zskiplist
的边界。
zset
的定义如下:
//有序集合
typedef struct zset {
//member和score的映射
dict *dict;
//跳跃表
zskiplist *zsl;
} zset;
包含了一个dict
和一个zsl
。
dict
的key
存储的是member
。
dict
的value
存储的是score
。
zskiplist
的定义如下:
//跳跃表
typedef struct zskiplist {
//头尾节点
struct zskiplistNode *header, *tail;
//节点个数
unsigned long length;
//最大层数
int level;
} zskiplist;
zskiplistNode
的定义如下:
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
//member
sds ele;
//score
double score;
//后向指针
struct zskiplistNode *backward;
//各层level
struct zskiplistLevel {
//前向指针,指向后续节点的同一层
struct zskiplistNode *forward;
//同一层跨度
unsigned long span;
} level[];
} zskiplistNode;
zset的结构图如下:
zset.png
1. 创建跳跃表
/* Create a new skiplist. */
//创建一个跳跃表zskiplist
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//一个跳跃表包含了:
// zskiplistNode header
// zskiplistNode tail
// int level
// int length
//分配内存
zsl = zmalloc(sizeof(*zsl));
//初始层为1
zsl->level = 1;
//初始长度为0
zsl->length = 0;
//zskiplistNode包含了score、ele、backward、zskiplistLevel[]
//创建一个ZSKIPLIST_MAXLEVEL层,分值为0,属性为null的节点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL, 0, NULL);
//zskiplistLevel数组中每个level包含了forward和span
//初始化level数组
//每层的forward为null,span为0
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
//backward为null
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
zskiplist
的组成部分包含:
- 头尾
zskiplistNode
指针header
、tail
- 节点总个数
length
- 最大层数
level
2. 创建跳跃表节点
//创建一个跳跃表节点
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn) + level * sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
zskiplistNode
的组成部分包含:
- 成员
member
ele - 分数
score
- 后续节点
backward
- 层
zskiplistLevel
数组
zskiplistLevel
的组成部分包含:
- 前序节点
forward
- 跨度
span
3. 插入元素
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
//插入一个节点到zskiplist中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
//遍历各层,寻找插入的位置
//用update数组来保存各层插入节点的位置
for (i = zsl->level - 1; i >= 0; i--) {
//如果i不是zsl->level-1层,那么i层的起始rank的i+1层的rank值
//依此累计各层的rank
//最终rank[0]+1就等于新节点的前置rank
//rank[0]在后续计算span和rank值用到
rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];
//forward不为空且分数小于插入节点的分数且member不相等
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0))) {
//求一个节点的rank,是将此节点经过的节点的span加起来
//a------>c
//a-->b-->c
//记录跨越的节点数
rank[i] += x->level[i].span;
//指向同一层的下一个节点
x = x->level[i].forward;
}
//保存各层应该插入元素的位置
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//随机一个层数
level = zslRandomLevel();
//如果新节点的层数大于原来跳跃表的最大层数
if (level > zsl->level) {
//更新大于跳跃表最大层数部分的属性
for (i = zsl->level; i < level; i++) {
//因为没有节点,所以排名为0
rank[i] = 0;
//因为没有节点,所以节点的前一个节点都是头节点
update[i] = zsl->header;
//未添加节点之前,需要更新的节点跨越的节点数为zsl->length
//因为整层只有一个头节点
//头节点的span都是链表长度
update[i]->level[i].span = zsl->length;
}
//更新跳跃表的最大层数
zsl->level = level;
}
//创建新节点,插入到第level层
x = zslCreateNode(level, score, ele);
//遍历所有层,从0到level层
for (i = 0; i < level; i++) {
//插入节点,同一层forward指针
//N1 N2 N3 rank span
//L3 ---------------->L3 4 0 4 0
//L2 ------>L2------->L2 4 2 0 2 2 0
//L1------->L1-->L1-->L1 4 2 1 0 2 1 1 0
//L0-->L0-->L0-->L0-->L0 4 3 2 1 0 1 1 1 1 0
//插入节点,修改前后指针指向
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* header update[i] x update[i]->forward
|-----------|-----------|-----------|-----------|-----------|-----------|
|<---update[i].span---->|
|<-------rank[i]------->|
|<-------------------rank[0]------------------->|
*/
//每一列的rank是一样的
//插入节点,前后节点的跨度也需要修改
//rank[0] - rank[i]表示update[0]->store与update[1]->store之间间隔了几个数
//rank[0]存储的是x元素距离头部的距离
//rank[i]存储的是update[i]距离头部的距离
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//update[i]->level[i].span 表示从update[i]到x的span的数目
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
//将范围外的节点的span数+1
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
4. 删除元素
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//遍历,找到要删除的节点
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) < 0))) {
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele, ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//遍历所有的层level
for (i = 0; i < zsl->level; i++) {
//访问层上的节点,找到x,修改指针,span
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
//删除x
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
//处理空层
while (zsl->level > 1 && zsl->header->level[zsl->level - 1].forward == NULL)
zsl->level--;
zsl->length--;
}
5. ZADD、ZINCRBY命令
//zdd命令 zadd key [NX|XX] [CH] [INCR] score member [score member ...]
void zaddCommand(client *c) {
zaddGenericCommand(c, ZADD_NONE);
}
//zincrby命令 zincrby key increment member
void zincrbyCommand(client *c) {
zaddGenericCommand(c, ZADD_INCR);
}
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
//获取key
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
//获取选项
while (scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt, "nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt, "xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt, "ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt, "incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc - scoreidx;
if (elements % 2 || !elements) {
addReply(c, shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
//获取score
scores = zmalloc(sizeof(double) * elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c, c->argv[scoreidx + j * 2], &scores[j], NULL)
!= C_OK)
goto cleanup;
}
//1.创建数据结构对象ziplist或者zset
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db, key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
//如果允许的ziplist上的entry最大个数为0或者member个数大于ziplist最大个数
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx + 1]->ptr)) {
//则创建跳跃表
zobj = createZsetObject();
}
//否则创建压缩列表
else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db, key, zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c, shared.wrongtypeerr);
goto cleanup;
}
}
//2.存入数据
for (j = 0; j < elements; j++) {
double newscore;
//score
score = scores[j];
int retflags = flags;
//member
ele = c->argv[scoreidx + 1 + j * 2]->ptr;
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c, nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added + updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c, score);
else
addReply(c, shared.nullbulk);
} else { /* ZADD. */
addReplyLongLong(c, ch ? added + updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db, key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
6. zrem命令
//zrem命令 zrem key member [member...]
void zremCommand(client *c) {
//获取key
robj *key = c->argv[1];
robj *zobj;
int deleted = 0, keyremoved = 0, j;
//查找key
if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
return;
//删除member
for (j = 2; j < c->argc; j++) {
if (zsetDel(zobj, c->argv[j]->ptr)) deleted++;
//member数为0,删除key
if (zsetLength(zobj) == 0) {
dbDelete(c->db, key);
keyremoved = 1;
break;
}
}
if (deleted) {
notifyKeyspaceEvent(NOTIFY_ZSET, "zrem", key, c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
signalModifiedKey(c->db, key);
server.dirty += deleted;
}
addReplyLongLong(c, deleted);
}
/* Delete the element 'ele' from the sorted set, returning 1 if the element
* existed and was deleted, 0 otherwise (the element was not there). */
//删除节点
int zsetDel(robj *zobj, sds ele) {
//ziplist
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//在ziplist上查找
if ((eptr = zzlFind(zobj->ptr, ele, NULL)) != NULL) {
//删除member
zobj->ptr = zzlDelete(zobj->ptr, eptr);
return 1;
}
}
//zskiplist
else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
//从dict上卸载
de = dictUnlink(zs->dict, ele);
if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */
score = *(double *) dictGetVal(de);
/* Delete from the hash table and later from the skiplist.
* Note that the order is important: deleting from the skiplist
* actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */
dictFreeUnlinkedEntry(zs->dict, de);
/* Delete from skiplist. */
//从跳跃表上删除
int retval = zslDelete(zs->zsl, score, ele, NULL);
serverAssert(retval);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* No such element found. */
}
7. zremrangebyrank、zremrangebyscore、zremrangebylex命令
//zremrangebyrank命令 zremrangebyrank key start stop
//移除指定排名区间的元素
void zremrangebyrankCommand(client *c) {
zremrangeGenericCommand(c, ZRANGE_RANK);
}
//zremrangebyscore命令 zremrangebyscore key min max
//移除指针分数区间的元素
void zremrangebyscoreCommand(client *c) {
zremrangeGenericCommand(c, ZRANGE_SCORE);
}
//zremrangebylex命令 zremrangebylex min max
//移除按照字典排序的指定区间的元素
void zremrangebylexCommand(client *c) {
zremrangeGenericCommand(c, ZRANGE_LEX);
}
#define ZRANGE_RANK 0
#define ZRANGE_SCORE 1
#define ZRANGE_LEX 2
void zremrangeGenericCommand(client *c, int rangetype) {
robj *key = c->argv[1];
robj *zobj;
int keyremoved = 0;
unsigned long deleted = 0;
zrangespec range;
zlexrangespec lexrange;
long start, end, llen;
/* Step 1: Parse the range. */
//zremrangebyrank key start stop
if (rangetype == ZRANGE_RANK) {
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
return;
}
//zremrangebyscore key min max
else if (rangetype == ZRANGE_SCORE) {
if (zslParseRange(c->argv[2], c->argv[3], &range) != C_OK) {
addReplyError(c, "min or max is not a float");
return;
}
}
//zremrangebylex key min max
else if (rangetype == ZRANGE_LEX) {
if (zslParseLexRange(c->argv[2], c->argv[3], &lexrange) != C_OK) {
addReplyError(c, "min or max not valid string range item");
return;
}
}
/* Step 2: Lookup & range sanity checks if needed. */
//从dict中查找key
if ((zobj = lookupKeyWriteOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
goto cleanup;
//rank索引处理
if (rangetype == ZRANGE_RANK) {
/* Sanitize indexes. */
llen = zsetLength(zobj);
if (start < 0) start = llen + start;
if (end < 0) end = llen + end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c, shared.czero);
goto cleanup;
}
if (end >= llen) end = llen - 1;
}
/* Step 3: Perform the range deletion operation. */
//ziplist类型
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
switch (rangetype) {
case ZRANGE_RANK:
zobj->ptr = zzlDeleteRangeByRank(zobj->ptr, start + 1, end + 1, &deleted);
break;
case ZRANGE_SCORE:
zobj->ptr = zzlDeleteRangeByScore(zobj->ptr, &range, &deleted);
break;
case ZRANGE_LEX:
zobj->ptr = zzlDeleteRangeByLex(zobj->ptr, &lexrange, &deleted);
break;
}
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db, key);
keyremoved = 1;
}
}
//zskiplist类型
else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
switch (rangetype) {
case ZRANGE_RANK:
deleted = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict);
break;
case ZRANGE_SCORE:
deleted = zslDeleteRangeByScore(zs->zsl, &range, zs->dict);
break;
case ZRANGE_LEX:
deleted = zslDeleteRangeByLex(zs->zsl, &lexrange, zs->dict);
break;
}
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db, key);
keyremoved = 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
/* Step 4: Notifications and reply. */
if (deleted) {
char *event[3] = {"zremrangebyrank", "zremrangebyscore", "zremrangebylex"};
signalModifiedKey(c->db, key);
notifyKeyspaceEvent(NOTIFY_ZSET, event[rangetype], key, c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
}
server.dirty += deleted;
addReplyLongLong(c, deleted);
cleanup:
if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
}
8. zrank、zrevrank命令
//zrank命令 zrank key member
void zrankCommand(client *c) {
zrankGenericCommand(c, 0);
}
//zrevrank命令 zrevrank key member
void zrevrankCommand(client *c) {
zrankGenericCommand(c, 1);
}
void zrankGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *ele = c->argv[2];
robj *zobj;
long rank;
if ((zobj = lookupKeyReadOrReply(c, key, shared.nullbulk)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
return;
serverAssertWithInfo(c, ele, sdsEncodedObject(ele));
//计算rank
rank = zsetRank(zobj, ele->ptr, reverse);
if (rank >= 0) {
addReplyLongLong(c, rank);
} else {
addReply(c, shared.nullbulk);
}
}
/* Given a sorted set object returns the 0-based rank of the object or
* -1 if the object does not exist.
*
* For rank we mean the position of the element in the sorted collection
* of elements. So the first element has rank 0, the second rank 1, and so
* forth up to length-1 elements.
*
* If 'reverse' is false, the rank is returned considering as first element
* the one with the lowest score. Otherwise if 'reverse' is non-zero
* the rank is computed considering as element with rank 0 the one with
* the highest score. */
//member的排名
long zsetRank(robj *zobj, sds ele, int reverse) {
unsigned long llen;
unsigned long rank;
llen = zsetLength(zobj);
//ziplist
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
//取ziplist第一个元素
eptr = ziplistIndex(zl, 0);
serverAssert(eptr != NULL);
//下一个元素
sptr = ziplistNext(zl, eptr);
serverAssert(sptr != NULL);
rank = 1;
//遍历
while (eptr != NULL) {
//比较,member一致返回
if (ziplistCompare(eptr, (unsigned char *) ele, sdslen(ele)))
break;
//排名+1
rank++;
//取下一个
zzlNext(zl, &eptr, &sptr);
}
//reverse从后往前排
if (eptr != NULL) {
if (reverse)
return llen - rank;
else
return rank - 1;
} else {
return -1;
}
}
//zskiplist
else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
//在zset的dict上查找key,返回score
de = dictFind(zs->dict, ele);
if (de != NULL) {
score = *(double *) dictGetVal(de);
//通过member和score返回rank
rank = zslGetRank(zsl, score, ele);
/* Existing elements always have a rank. */
serverAssert(rank != 0);
if (reverse)
return llen - rank;
else
return rank - 1;
} else {
return -1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
9. zpopmin、zpopmax命令
//zpopmin命令 zpopmin key [count] 取出最小的几个member
void zpopminCommand(client *c) {
if (c->argc > 3) {
addReply(c, shared.syntaxerr);
return;
}
genericZpopCommand(c, &c->argv[1], 1, ZSET_MIN, 0,
c->argc == 3 ? c->argv[2] : NULL);
}
//zpopmax命令 zpopmax key [count] 取出最大的几个member
void zpopmaxCommand(client *c) {
if (c->argc > 3) {
addReply(c, shared.syntaxerr);
return;
}
genericZpopCommand(c, &c->argv[1], 1, ZSET_MAX, 0,
c->argc == 3 ? c->argv[2] : NULL);
}
/* This command implements the generic zpop operation, used by:
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
* inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
*
* If 'emitkey' is true also the key name is emitted, useful for the blocking
* behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
*
* The synchronous version instead does not need to emit the key, but may
* use the 'count' argument to return multiple items if available. */
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
int idx;
robj *key = NULL;
robj *zobj = NULL;
sds ele;
double score;
long count = 1;
/* If a count argument as passed, parse it or return an error. */
if (countarg) {
if (getLongFromObjectOrReply(c, countarg, &count, NULL) != C_OK)
return;
if (count <= 0) {
addReply(c, shared.emptymultibulk);
return;
}
}
/* Check type and break on the first error, otherwise identify candidate. */
idx = 0;
while (idx < keyc) {
key = keyv[idx++];
zobj = lookupKeyWrite(c->db, key);
if (!zobj) continue;
if (checkType(c, zobj, OBJ_ZSET)) return;
break;
}
/* No candidate for zpopping, return empty. */
if (!zobj) {
addReply(c, shared.emptymultibulk);
return;
}
void *arraylen_ptr = addDeferredMultiBulkLength(c);
long arraylen = 0;
/* We emit the key only for the blocking variant. */
if (emitkey) addReplyBulk(c, key);
//循环count次,pop元素
do {
//ziplist
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
/* Get the first or last element in the sorted set. */
//根据where判断要开始的位置
eptr = ziplistIndex(zl, where == ZSET_MAX ? -2 : 0);
serverAssertWithInfo(c, zobj, eptr != NULL);
serverAssertWithInfo(c, zobj, ziplistGet(eptr, &vstr, &vlen, &vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr, vlen);
/* Get the score. */
sptr = ziplistNext(zl, eptr);
serverAssertWithInfo(c, zobj, sptr != NULL);
score = zzlGetScore(sptr);
}
//zskiplist
else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zln;
//根据where判断要开始的位置
zln = (where == ZSET_MAX ? zsl->tail :
zsl->header->level[0].forward);
/* There must be an element in the sorted set. */
serverAssertWithInfo(c, zobj, zln != NULL);
ele = sdsdup(zln->ele);
score = zln->score;
} else {
serverPanic("Unknown sorted set encoding");
}
serverAssertWithInfo(c, zobj, zsetDel(zobj, ele));
server.dirty++;
if (arraylen == 0) { /* Do this only for the first iteration. */
char *events[2] = {"zpopmin", "zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET, events[where], key, c->db->id);
signalModifiedKey(c->db, key);
}
addReplyBulkCBuffer(c, ele, sdslen(ele));
addReplyDouble(c, score);
sdsfree(ele);
arraylen += 2;
//如果没有member了,移除key
if (zsetLength(zobj) == 0) {
dbDelete(c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
break;
}
} while (--count);
setDeferredMultiBulkLength(c, arraylen_ptr, arraylen + (emitkey != 0));
}
10. bzpopmin、bzpopmax命令
// BZPOPMIN key [key ...] timeout
void bzpopminCommand(client *c) {
blockingGenericZpopCommand(c, ZSET_MIN);
}
// BZPOPMAX key [key ...] timeout
void bzpopmaxCommand(client *c) {
blockingGenericZpopCommand(c, ZSET_MAX);
}
/* BZPOPMIN / BZPOPMAX actual implementation. */
//阻塞移除最大最小的几个member
void blockingGenericZpopCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c, c->argv[c->argc - 1], &timeout, UNIT_SECONDS)
!= C_OK)
return;
for (j = 1; j < c->argc - 1; j++) {
o = lookupKeyWrite(c->db, c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_ZSET) {
addReply(c, shared.wrongtypeerr);
return;
} else {
if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
//调用zpopmin\zpopmax的实现
genericZpopCommand(c, &c->argv[j], 1, where, 1, NULL);
/* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
rewriteClientCommandVector(c, 2,
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
c->argv[j]);
return;
}
}
}
}
/* If we are inside a MULTI/EXEC and the zset is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c, shared.nullmultibulk);
return;
}
/* If the keys do not exist we must block */
//如果key不存在,则阻塞
blockForKeys(c, BLOCKED_ZSET, c->argv + 1, c->argc - 2, timeout, NULL, NULL);
}