ZeroMQ源码解析之yqueue

2016-10-12  本文已影响373人  分享放大价值

What is yqueue?

yqueue是ZMQ底层实现的一个高效的队列数据结构。它的主要特点有两个:

注意:用户必须保证不在空的yqueue上进行出队(pop)操作,并且在未进行同步的情况下,生产者和消费者不应该同时访问同一个元素。

yqueue API

ZMQ实现的yqueue提供了如下几个接口:

template <typename T, int N> class yqueue_t {
public:
    // 获取队头元素的引用
    inline T &front();
    // 获取队尾元素的引用
    inline T &back();
    // 向队列插入元素
    inline void push();
    // 撤销最近一次得插入操作
    inline void unpush();
    // 将队头出列
    inline void pop();
}

ZMQ的队列操作和C++的queue“很不一样”,在使用该数据结构时要非常小心。

Implementation

yqueue实质上是对如下结构体的封装:

private:
    struct chunk_t {
        T values [N];
        chunk_t *prev;
        chunk_t *next;
    };

yqueue所有队列的操作都是通过下面的几个指针来完成的:

private:
    chunk_t *begin_chunk;
    int begin_pos;
    chunk_t *back_chunk;
    int back_pos;
    chunk_t *end_chunk;
    int end_pos;

yqueue的生产者和消费者不用加锁是通过一个原子指针实现的:

private:
    template <typename T> class atomic_ptr_t {
    public:
        // 非线程安全,在初始化时用来为指定初值
        inline void set(T *ptr);
        // 原子“交换”操作,将值设置为val并返回之前的值
        inline T *xchg(T *val);
        // 原子的“对比 & 交换”操作,若旧值与cmp相等,边赋值为val;
        // 否则旧值不变。最后返回旧值。
        inline T *cas(T *cmp, T*val);
    private:
        volatile T *ptr;
    };
    atomic_ptr_t<chunk_t> spare_chunk;

yqueue之所以能极大程度的减少内存分配,并不仅仅是选取恰当的N值,还得益于底层实现采用的“内存重用”技术:

inline void push() {
    // 将back()依赖的指针定位到yqueue尾部,以便push()调用后执行back()操作
    back_chunk = end_chunk;
    back_pos = end_pos;

    // 当前的块仍有空间,不需要扩容
    if (++end_pos != N)
        return;

    // 获取原子指针的值,并将原子指针的值设为NULL
    // 如果有pop()操作造成的废弃的块,yqueue不会马上释放
    // 而是用原子指针保存,以便内存重用
    chunk_t *sc = spare_chunk.xchg(NULL);
    if (sc) {
        // 若此时有可重用的内存,则sc非空
        end_chunk->next = sc;
        sc->prev = end_chunk;
    } else {
        // sc == NULL表示没有可重用内存,此时需要malloc动态分配
        end_chunk->next = (chunk_t*)malloc(sizeof(chunk_t));
        end_chunk->next->prev = end_chunk;
    }
    // 将队尾移到扩容后的块上
    end_chunk = end_chunk->next;
    end_pos = 0;
}

inline void pop() {
    if (++begin_pos == N) {
        // 当前块已全部出队(废弃),用临时指针o指向
        chunk_t *o = begin_chunk;
        // 将队头指向下一块的开始,剔除旧块,并复位偏移量
        begin_chunk = begin_chunk->next;
        begin_chunk->prev = NULL;
        begin_pos = 0;

        // 根据“最近经常使用”内存分页算法,将O设为重用块效率最高
        // 也就是说,如果马上扩容时,不需要进行内存页面替换操作
        // geek,利用操作系统内存管理,追求性能到极致
        chunk_t *cs = spare_chunk.xchg(o);
        // 别忘了释放旧的、不用的块,这回是真的没用了
        free(cs);
    }
}

Tips

上一篇下一篇

猜你喜欢

热点阅读