大数据技术干货数据结构和算法分析

跳跃列表(Skip List)与其在Redis中的实现详解

2019-05-16  本文已影响25人  LittleMagic

目录

引子

跳跃列表(Skip List),简称跳表。在前面我写过的一篇讲解HBase MemStore的文章中曾经提到,MemStore(其实也包括BlockCache)的基础数据结构都是JUC包提供的并发跳表ConcurrentSkipListMap。当时我就立flag说跳表是个有意思的数据结构,会写文章专门分析它,那么今天就来兑现承诺吧。

认识跳表

跳表的提出

跳表首先由William Pugh在其1990年的论文《Skip lists: A probabilistic alternative to balanced trees》中提出。由该论文的题目可以知道两点:

看官可能会觉得这两点与之前文章中讲过的布隆过滤器有些相似,但实际上它们的原理和用途还是很不相同的。

由二叉树回归链表

考虑在有序序列中查找某个特定元素的情境:

我们知道,普通BST插入元素越有序效率越低,最坏情况会退化回链表。因此很多大佬提出了自平衡BST结构,使其在任何情况下的增删查操作都保持O(logn)的时间复杂度。自平衡BST的代表就是AVL树、Splay树、2-3树及其衍生出的红黑树。如果推广之,不限于二叉树的话,我们耳熟能详的B树和B+树也属此类,常用于文件系统和数据库。

自平衡BST显然很香,但是它仍然有一个不那么香的点:树的自平衡过程比较复杂,实现起来麻烦,在高并发的情况下,加锁也会带来可观的overhead。如AVL树需要LL、LR、RL、RR四种旋转操作保持平衡,红黑树则需要左旋、右旋和节点变色三种操作。下面的动图展示的就是AVL树在插入元素时的平衡过程。

那么,有没有简单点的、与自平衡BST效率相近的实现方法呢?答案就是跳表,并且它简单很多,下面就来看一看。

设计思想与查找流程

跳表就是如同下图一样的许多链表的集合。


跳表具有如下的性质:

很显然这是一种空间换时间的思路,与索引异曲同工。第k层可以视为第k-1级索引,用来加速查找。为了避免占用空间过多,第1层之上都不存储实际数据,只有指针(包含指向同层下一个元素的指针与同一个元素下层的指针)。

当查找元素时,会从最顶层链表的头节点开始遍历。以升序跳表为例,如果当前节点的下一个节点包含的值比目标元素值小,则继续向右查找。如果下一个节点的值比目标值大,就转到当前层的下一层去查找。重复向右和向下的操作,直到找到与目标值相等的元素为止。下图中的蓝色箭头标记出了查找元素21的步骤。

通过图示,我们也可以更加明白“跳表”这个名称的含义,因为查找过程确实是跳跃的,比线性查找省时。若要查找在高层存在的元素(如25),步数就会变得更少。当数据量越来越大时,这种结构的优势就更加明显了。

插入元素的概率性

前文已经说过,跳表第k层的元素会按一定的概率p在第k+1层出现,这种概率性就是在插入过程中实现的。

当按照上述查找流程找到新元素的插入位置之后,首先将其插入第1层。至于是否要插入第2,3,4...层,就需要用随机数等方法来确定。最通用的实现方法描述如下。

int randomizeLevel(double p, int lmax) {
    int level = 1;
    Random random = new Random();
    while (random.nextDouble() < p && level < lmax) {
        level++;
    }
    return level;
}

得到层数k之后,就将新元素插入跳表的第1~k层。由上面的逻辑可知,随着层数的增加,元素被插入高层的概率会指数级下降。

下面的动图示出以p=1/2概率在跳表中插入元素的过程。这种方法也被叫做“抛钢镚儿”(coin flip),直到抛出正面/反面就停止。

相对于插入而言,删除元素没有这么多弯弯绕,基本上就是正常的单链表删除逻辑,因此不再展开。

复杂度分析

只要我们找出平均查找长度,自然就可以知道跳表的平均时间复杂度了。为了便于分析,我们将查找元素的流程反过来,从目的元素向头节点看,也就是变成向上、向左的操作。由于随机层数的计算是相互独立的,因此这样做并无妨。

假设从层数为l的某节点x出发,距离最顶层有k层。如果x有第l+1层的指针就向上走,没有的话就向左走。它们的概率分别为p与1-p。借用William Pugh画好的图如下。


设C[k]为向上走k层要走过的路径长度期望值,就有:C[0] = 0 ; C[k] = (1-p)·(1+C[k]) + p·(1+C[k-1]),最终可得:C[k] = k/p。

接下来设跳表中包含n个元素,那么第1层就有n个元素,第2层平均有np个,第3层平均有np2个……依次类推。可知,跳表层数的期望值为log1/pn。结合上面得出的结论,跳表的平均查找长度就是:[log1/pn - 1] / p。

也就是说,跳表增删查的平均时间复杂度为O(logn),达到了平衡BST的水准。当然它毕竟是不稳定的,如果运气奇差无比,总是无法建立起层级结构的话,最坏时间复杂度仍然会退化到O(n)级别,但这个概率是非常微小的了。

由上面的分析我们还可以知道,跳表中单个节点的层数为1的概率为1-p,层数为2的概率为p(1-p),层数为3的概率为p2(1-p)……依次类推。所以,单个节点层数的期望为:(1-p) · ∑[k=1→+∞](kpk-1) = 1/(1-p)。这也是单个节点中指针数量的平均值,列表如下。

可见,p越小,节点的空间开销也就越小,但(正规化的)查找长度会相应越大。

Redis的跳表实现

跳表在很多框架中都有广泛的应用,除Java并发包及HBase之外,比较著名的是Redis和leveldb。之前一直读Java系的源码,有些腻烦了,并且我最近正好在研究一些Redis调优方面的事情,就干脆拿Redis 4.0.14的源码来讲讲吧。

从zset到zskiplist

跳表在Redis中称为zskiplist,是其提供的有序集合(sorted set/zset)类型底层的数据结构之一。zset的定义如下,位于server.h中。

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

可见,除了zskiplist之外,zset还使用了KV哈希表dict。Redis中有序集合的默认实现其实是更为普遍的ziplist(压缩双链表),但在redis.conf中有两个参数可以控制它转为zset实现。

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

也就是说,当有序集合中的元素数超过zset-max-ziplist-entries时,或其中任意一个元素的数据长度超过zset-max-ziplist-value时,就由ziplist自动转化为zset。具体逻辑参见t_zset.c中的zsetConvert()函数,不再赘述。

扯远了,回来看看zskiplist,它的定义就在zset上面。

typedef struct zskiplistNode {
    robj *obj;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

zskiplist的节点定义是结构体zskiplistNode,其中有以下字段。

zskiplist就是跳表本身,其中有以下字段。

下图示出一个length=3,level=5的zskiplist。

可见,zskiplist的第1层是个双向链表,其他层仍然是单向链表,这样做是为了方便可能的逆向获取数据的需求。

另外,节点中还会保存前向指针跳过的节点数span,这是因为zset本身支持基于排名的操作,如zrevrank指令(由数据查询排名)、zrevrange指令(由排名范围查询数据)等。如果有span值的话,就可以方便地在查找过程中累积出排名了。

以上是zskiplist相对于前述的传统跳表的两点不同,并且都给我们带来了便利。下面我们来继续读代码,看看它的部分具体操作。

创建zskiplist

zslCreate()函数位于t_zset.c中。

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

并不难懂,唯一需要注意的是常量ZSKIPLIST_MAXLEVEL,它定义了zskiplist的最大层数,值为32,这也是上节图中的节点最高只到L32的原因。

向zskiplist插入元素

插入元素靠的是zslInsert()函数,有点长。

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

该方法涉及到的细节很多,其大致执行流程如下:

其中维护了两个数组update和rank。update数组用来记录每一层的最后一个分数小于待插入score的节点,也就是插入位置。rank数组用来记录上述插入位置的上一个节点的排名,以便于最后更新span值。

随机计算层数的zslRandomLevel()方法如下。

int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

注意p值由ZSKIPLIST_P常量定义,值为0.25,即被插入到高层的概率为1/4。

下图示出插入元素的流程。


查询元素排名/获取排名对应元素

从zslGetRank()和zslGetElementByRank()这两个函数可以更明显地看出通过累积span字段的值获取到排名的操作。代码本身比较容易理解,如下所示。

unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        if (x->obj && equalStringObjects(x->obj,o)) {
            return rank;
        }
    }
    return 0;
}

zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
Redis作者对采用跳表的解释

比起我多费口舌,不如来看看Salvatore Sanfillipo(@antirez)本人说的话。他多年之前在Hacker News的一篇帖子上解释了自己为什么要在Redis中用跳表而不是树,原文如下,浅显易懂,就不翻译了:

  1. They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.
  2. A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.
  3. They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.

The End

谢谢食用~

上一篇 下一篇

猜你喜欢

热点阅读