Skip List

2020-09-20  本文已影响0人  老杜振熙

redis在有序集合键集群结点中用到了跳转表。那么,还是从跳转表的原始结构开始说起。


Origin

跳转表这一数据结构出自William Pugh的文章^{[1]},援引自论文中的一句话如下面框图中所示。我们首先可以知道,跳转表是为了在一定程度上替代平衡树而产生。替代的机制就是:按概率来保持平衡而不是严格地保持平衡。这句话初读起来有点抽象,但后面我们解析了跳转表的细节原理之后,就能明白这句话里面蕴含的道理。

Skip lists are a data structure that can be used in place of balanced trees.
Balancing a data structure probabilistically is easier than explicitly maintaining the balance.

跳转表的底层还是基于有序链表。我们依旧以文献中的示意图来对其进行讲解。图1(a)是一个原始的有序链表,那么对于元素的查找,最多需要遍历N个结点,N为链表长度。然而,如果我们采取空间换时间的策略,如图1(b)所示,每两个结点里面就有1个结点,设其为第i个结点,该结点拥有2个指针,第1个指针维持原来的指向,用于构成有序链表,但第2个指针则把跨度增大了,其指向第i+2个结点。这样做的好处就是,现在,我们在进行查找的时候,最多只需要遍历\lfloor N/2+1 \rfloor个元素即可。根据这样的思路,我们可以逐渐的加大指针跨度的上限,也就是如图1(c)和图1(d)所示。不必深入研究这种结构的各种细节,我们只需要知道其拥有以下几个特点:① 现在各个结点拥有了一个名为层数的属性,其表示结点拥有的前向指针的个数;② 层数越高的结点,其在链表中所占的比例越低,比如说图1(d)中,层数为1的结点占50%左右,而层数为2的结点的比例则降至25%左右;③第k层的指针的跨度是固定的,永远是指向2^{k-1}之后的那个结点;④ 第i个结点的层数也是固定的(只要链表的最大跨度已经确定)。

图1. 跳转表示意图.png

不要以为上面所说的那种结构就是跳转表,我们不妨将其命名为"结构a"。根据上述总结的特点,我们可以发现,"结构a"的致命缺陷便是,其对于增加和删除结点是无能为力的!因为我们必须要保持特点③和④。举个例子,如果我要删除图1(d)中value为9的那个结点,那么首先,我必须按次序将后面的结点,也就是结点12的层数变为3,结点17的层数变为1,等等等等,以维持特点④......不仅如此,我们还要逐个改变链表指针的指向,以维持特点③。Trouble!!!这显然是一个根本不现实的事情。

那么,到底应该怎样做,才能在保持住"结构a"的优点的同时,还能规避其缺点呢?跳转表!所谓跳转表,其实就是对"结构a"的一些特点做了些许的修改:
1.首先,取消特点④的限制,即第i个结点的层数不再固定,相反,我们利用随机数发生器对结点的层数随机做选择。但需要注意的是,虽然是随机的,也还是要保持特点②所示的分布特性,也就是层数为K的结点,其所占比例要在2^{-K}左右。
2.修改特点③的说法。现在,第k层的指针的跨度不再是固定的,但还是有目的性的,其指向的是下一个至少拥有k层的结点。比如说对于图1(e)中value为6的结点的第3层的指针,那么该指针指向的是下一个至少拥有3层的结点,也就是value为25的那个结点。

做了这样的改变之后,我们再来看看是不是达到了我们的目的。
1.首先,对于特定结点的查找,思路不变,我们从某个结点(可以设为首结点)的顶层指针开始,看其指向的结点的value是不是我们想要的,如果相等,直接返回结点;如果其指向的结点的value更小,那么我们转移到该结点(相当于加速了),继续判断;如果更大,则我们转向当前结点的下一层的指针继续判断。显然,对于查找而言,可以达到我们的目的,即加速处理。
2.其次,对于删除结点而言,我们只需要进行一些指针的接续就行了。比如说对于图1(e)而言,如果我们要删除value为9的元素,那么我们只需要将p_1p_2连接,p_3p_4连接,即可!
3.对于增加结点,和删除节点类似。

可以看出,跳转表的结构,不但加快了查询的速度,而且对于增删操作也相当简洁。而归根结底,这样的优点的原因就是开头的那句话:Balancing a data structure probabilistically is easier than explicitly maintaining the balance. 按概率来保持平衡而不是严格地保持平衡。而这,也就是跳转表的核心机制了。


zskiplist

有了前面关于跳转表的基础,现在再来理解redis中的跳转表zskiplist结构就轻松很多了。zskiplist由多个结点zskiplistNode构成,并附加了一些其他的数据结构。如下面的代码所示,是其结构体定义。zskiplist的结点排序规则的伪代码为:N1.score<N2.score || (N1.score==N2.score&&N1.ele<N2.ele)。可以看出,zskiplist的定义基本上遵循了跳转表的各项机制,在进行结点查询的时候,也是在结点的level数组中从高到低进行查询和跳转。需要注意的是,zskiplist中关于scorerank的区别。

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    sds ele; // 结点的名字
    double score; // 结点的分值
    struct zskiplistNode *backward; // 后退指针,指向前一个结点,每个结点有1个后退指针
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[]; // 层;每层拥有一个前进指针和一个跨度,跨度代表该前进指针指向的结点和当前结点距离之差
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail; // 跳转表的头节点和尾结点
    unsigned long length; // 跳转表的长度
    int level; // 跳转表中所有结点的层数的最大值
} zskiplist;

接下来,再结合redis源码中关于结点插入的代码,对其进行理解。代码如下所示,并且本人在阅读的过程中已经加入了一些comment。对结点的插入,关键的地方就是找到那些指向新节点的前向指针,比如说对于图1(e),如果我们想要插入一个分数为8的结点,且新节点的层数为4(根据随机数发生器生成),那么我们就需要找到指针p_1p_3,以及p_5p_6,可以想到的是,每一层只会有一个这样的前向指针。代码中的update数组也就是这里所说的前向指针数组

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL]; // 用于更新指针的span
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header; // 注意,header的层数永远不会小于zskiplist的level变量
    for (i = zsl->level-1; i >= 0; i--) { // 从表的最高层开始,由高至低进行迭代
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0))) // 结点就是通过score和ele来比较大小
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward; // 加速!
        }
        update[i] = x; // update[i]就是第i层的前向指针的位置,即对应了哪个结点
        // 因为在进行结点的插入的时候,还需要对前后各层的指针进行重新赋值,
        // 所以需要记录各层对应的指针是在哪一个结点
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            // 直接从header连到tail(为什么不是连到新的结点?)
            // 因为这是在构造原始的环境啊笨蛋!!!
            update[i]->level[i].span = zsl->length; 
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele); // 根据入参和随机生成的层数新生成一个结点
    for (i = 0; i < level; i++) { // 新节点的层数为level
        x->level[i].forward = update[i]->level[i].forward; // 对新节点的各层前向指针进行赋值
        update[i]->level[i].forward = x; // update中各层的指针也就改为指向新节点

        /* update span covered by update[i] as x is inserted here */
        // 因为新插入了结点,因此update中各层的前向指针的span也就是进行更新
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    // 此处的情况对应于新节点的层数并没有达到zskiplist的最大层数,
    // 因此update中更高层的前向指针的span肯定会加1(因为新添加了一个结点嘛)
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // update[0]肯定是x的前一个结点;
    // 注意,如果新的x结点就是在header之后了,那么x的后退指针不会指向header,而是变为空指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

其他


引用

[1] William Pugh, Skip Lists: A Probabilistic Alternative to Balanced Trees. 1989.

上一篇下一篇

猜你喜欢

热点阅读