定时器

2023-02-25  本文已影响0人  大海无垠_af22

基于set实现

set有有序表,因此基于时间可以做到一个有序的任务列表,方便的实现添加、删除和查询功能。

struct TimerItem {
    list_head link;
    int64_t target_time;
};

template<class T, TimerItem T::*timer_item = &T::timer_item>
class TimerBySet {
    using Unit = int64_t;
    inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
    inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
    inline static list_head* get_link(T* t) {
        return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
    }
    inline static const list_head* get_link(const T* t) {
        return &((t->*timer_item).link);
    }
    struct Compare {
        bool operator()(T*c, T*r) const {
            auto cus = get_timeout(c), rus = get_timeout(r);
            if (cus != rus) return cus < rus;
            return reinterpret_cast<const void*>(c) < reinterpret_cast<const void*>(r);
        }
    };
public:
    void init(Unit) {}
    bool empty() const {
        return items_.empty();
    }
    size_t size() const {
        return items_.size();
    }
    Unit min_time() const {
        if (items_.empty()) return 0;
        auto itr = *items_.begin();
        return get_timeout(itr);
    }
    void insert(Unit time_us, T* t) {
        set_timeout(t, time_us);
        items_.insert(t);
    }
    void remove(T* t) {
        items_.erase(t);
    }
    void pop(Unit now, list_head* result_list) {
        result_list->clear();
        if (empty() || min_time() > now) return;
        while (!items_.empty() && get_timeout(*items_.begin()) <= now) {
            T* t = *items_.begin();
            items_.erase(items_.begin());
            set_timeout(t, 0);
            result_list->push_back(get_link(t));
        }
    }

private:
    std::set<T*, Compare> items_;
};

基于堆

小根堆可以很方便得到最小值,插入和删除也是O(log(N))复杂度。

struct TimerHeapItem: public TimerItem {
    size_t index;
};

template<class T, TimerHeapItem T::*timer_item = &T::timer_item>
class TimerByHeap {
    using Unit = int64_t;
    inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
    inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
    inline static void set_index(T* t, size_t index) { (t->*timer_item).index = index;}
    inline static size_t get_index(const T* t) { return (t->*timer_item).index;}
    inline static list_head* get_link(T* t) {
        return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
    }
    inline static const list_head* get_link(const T* t) {
        return &((t->*timer_item).link);
    }
public:
    void init(Unit) {}
    bool empty() const { return heap_.empty();}
    size_t size() const { return heap_.size();}
    Unit min_time() const {
        if (empty()) return 0;
        return get_timeout(heap_[0]);
    }
    void insert(Unit tp, T* t) {
        assert(tp > 0);
        set_timeout(t, tp);
        set_index(t, size());
        heap_.push_back(t);
        _adjust(size() - 1);
    }
    void remove(T* t) {
        size_t index = get_index(t);
        if (unlikely(index >= size())) {
            return;
        }
        set_index(t, std::numeric_limits<size_t>::max());
        if (unlikely(index + 1 == size())) {
            heap_.back() = nullptr;
            heap_.pop_back();
            return;
        }
        t = heap_[index] = heap_.back();
        set_index(t, index);
        heap_.back() = nullptr;
        heap_.pop_back();
        _adjust(index);
    }
    void pop(Unit now, list_head* result_list) {
        result_list->clear();
        while (!empty() && get_timeout(heap_[0]) <= now) {
            T* t = heap_[0];
            remove(t);
            result_list->push_back(get_link(t));
        }
    }

private:
    bool _less_than(size_t a, size_t b) {
        return get_timeout(heap_[a]) < get_timeout(heap_[b]);
    }
    void _swap(size_t a, size_t b) {
        set_index(heap_[a], b);
        set_index(heap_[b], a);
        std::swap(heap_[a], heap_[b]);
    }
    void _adjust(size_t index) {
        assert(index < heap_.size());
        // adjust up
        for (size_t pos = index; pos > 0; ) {
            size_t father = (pos - 1) >> 1;
            if (!_less_than(pos, father)) {
                break;
            }
            _swap(pos, father);
            pos = father;
        }
        // adjust down
        for (;index < size();) {
            size_t left = 2 * index + 1, right = left + 1;
            if (left >= size()) break;
            if (right < size() && _less_than(right, index) && _less_than(right, left)) {
                _swap(index, right);
                index = right;
                continue;
            }
            if (_less_than(left, index)) {
                _swap(index, left);
                index = left;
                continue;
            }
            break;
        }
    }
    std::deque<T*> heap_;
};

基于时间轮

时间轮本质上是利用时间的整数特性实现的特殊的哈希表。
这里的采用int64表示时间,每6位二进制做一个时间轮,总计11个轮,每个轮有2^6=64个分片。插入的时间按照高位到低位和now对比,第一个不同的轮就是需要插入的位置,再取轮中的6位插入对应的分片中。删除时也能从时间反推到插入的位置,由于采用双向链表结构,插入和删除都是常量时间复杂度。pop和查询min_time()相对麻烦,耗时更多。考虑到插入和删除是频率最高的调用,因此这个耗时在接受范围内。为了加快pop和查询,同时还采用位图记录每个分片是否为空,通过位图的高效查找跳过空的分片。
位图的代码如下。

inline size_t high_zero_bit_num(uint64_t num) {
    return __builtin_clzll(num);
}
inline size_t low_zero_bit_num(uint64_t num) {
    return __builtin_ctzll(num);
}

template<size_t BitNum>
class BitMap {
public:
    BitMap() {
        clear();
    }
    void clear() {
        for (size_t i=0; i< kUnitNum;++i) bitmap_[i] = 0;
    }
    inline void clear_bit(size_t nth) {
        if (unlikely(nth >= BitNum)) return;
        size_t idx = nth >> kBitmapShift;
        size_t bit = nth & kUnitMask;
        bitmap_[idx] &= ~(uint64_t(1) << bit);
    }
    inline void set_bit(size_t nth) {
        if (unlikely(nth >= BitNum)) return;
        size_t idx = nth >> kBitmapShift;
        size_t bit = nth & kUnitMask;
        bitmap_[idx] |= uint64_t(1) << bit;
    }
    size_t size() const { return BitNum;}
    inline bool get_bit(size_t nth) const {
        if (unlikely(nth >= BitNum)) return false;
        size_t idx = nth >> kBitmapShift;
        size_t bit = nth & kUnitMask;
        return (bitmap_[idx] >> bit) & 1;
    }
    // [start, end)
    // return nth where get_bit(nth) == true, else return end if not found
    inline size_t find_first_set_bit(size_t start = 0, size_t end = BitNum) const {
        if (unlikely(end > BitNum)) {
            end = BitNum;
        }
        if (unlikely(start >= end)) {
            return end;
        }
        size_t idx_start = start >> kBitmapShift;
        size_t bit_start = start & kUnitMask;
        size_t idx_end = end >> kBitmapShift;
        size_t bit_end = end & kUnitMask;
        uint64_t unit = (bitmap_[idx_start] >> bit_start) << bit_start;
        size_t zn = low_zero_bit_num(unit);
        if (zn < kBitmapUnit) {
            return (idx_start<<kBitmapShift) + zn;
        }
        for (size_t k=idx_start+1; k<idx_end; ++k) {
            zn = low_zero_bit_num(bitmap_[k]);
            if (zn == kBitmapUnit) {
                continue;
            }
            return (k<<kBitmapShift) + zn;
        }
        if (idx_end >= kUnitNum) {
            return end;
        }
        zn = low_zero_bit_num(bitmap_[idx_end]);
        if (zn < bit_end) {
            return (idx_end<<kBitmapShift) + zn;
        }
        return end;
    }

private:
    static constexpr size_t kBitmapShift = 6;
    static constexpr size_t kBitmapUnit = 1 << kBitmapShift;
    static constexpr size_t kUnitMask = kBitmapUnit - 1;
    static constexpr size_t kUnitNum =
        (BitNum + kBitmapUnit - 1) >> kBitmapShift;
    std::array<uint64_t, kUnitNum> bitmap_;
};

时间轮代码如下。

struct TimerItem {
    list_head link;
    int64_t target_time;
};

template<class T, TimerItem T::*timer_item = &T::timer_item>
class TimeWheel {
private:
    // timer = [wheel1, wheel2, ...]
    // wheel = [slot1, slot2, ... slot_64]
    using Unit = int64_t;
    static constexpr size_t kUnitBits = 8 * sizeof(Unit);
    static constexpr size_t kSlotBits = 6;
    static constexpr size_t kSlotNum = 1 << kSlotBits;
    static constexpr size_t kWheels = (kUnitBits + kSlotBits - 1)/kSlotBits;
    static constexpr size_t kSlotAll = kWheels * kSlotNum;
    Unit now_{};
    size_t size_{};
    BitMap<kSlotAll> slot_bitmap_;
    list_head timeouts_{};
    std::array<list_head, kSlotAll> items_;

    inline static void set_timeout(T* t, Unit u) { (t->*timer_item).target_time = u;}
    inline static Unit get_timeout(const T* t) { return (t->*timer_item).target_time;}
    inline static list_head* get_link(T* t) {
        return const_cast<list_head*>(get_link(static_cast<const T*>(t)));
    }
    inline static const list_head* get_link(const T* t) {
        return &((t->*timer_item).link);
    }
    inline static T* from_link(list_head* h) {
        return const_cast<T*>(from_link(static_cast<const list_head*>(h)));
    }
    inline static const T* from_link(const list_head* h) {
        static const size_t off =
            reinterpret_cast<size_t>(get_link(static_cast<const T*>(nullptr)));
        if (h == nullptr) return nullptr;
        const char* ptr = reinterpret_cast<const char*>(h) - off;
        const T* t = reinterpret_cast<const T*>(ptr);
        return t;
    }
    inline size_t slot_in_wheel(Unit u, size_t wheel) const {
        // assert(wheel < kWheels);
        return (u >> (kSlotBits * (kWheels - 1 - wheel))) & (kSlotNum - 1);
    }
    inline size_t wheel_at(Unit u) const {
        size_t zero_bit_n = high_zero_bit_num(now_ ^ u);
        assert(zero_bit_n < kUnitBits);
        return (zero_bit_n + kWheels * kSlotBits - kUnitBits) / kSlotBits;
    }
    inline size_t slot_in_global(Unit u) const {
        size_t wheel = wheel_at(u);
        size_t index = (wheel << kSlotBits) | slot_in_wheel(u, wheel);
        return index;
    }
    inline void _remove(list_head* h, list_head* result_list) {
        remove(from_link(h));
        result_list->push_back(h);
    }
    inline void _insert_to_slots(Unit tp, list_head* node) {
        auto global_slot = slot_in_global(tp);
        list_head& h = items_[global_slot];
        bool empty = h.empty();
        h.push_back(node);
        if (empty) {
            slot_bitmap_.set_bit(global_slot);
        }
    }
    inline void _pop_to_list(list_head* result_list, list_head* head) {
        for (list_head* h=head->next; h != head; h=head->next) {
            _remove(h, result_list);
        }
    }
    // [start_slot, end_slot)
    inline void _pop_wheel(size_t wheel, size_t end_slot, list_head* result_list) {
        size_t global_base_slot = wheel << kSlotBits;
        size_t start_slot = slot_in_wheel(now_, wheel);
        size_t start_global = global_base_slot + start_slot;
        size_t end_global = global_base_slot + end_slot;
        for(size_t i = start_global; i < end_global; ++i) {
            i = slot_bitmap_.find_first_set_bit(i, end_global);
            if (i == end_global) {
                break;
            }
            list_head* head = &items_[i];
            _pop_to_list(result_list, head);
            slot_bitmap_.clear_bit(i);
        }
    }
    inline bool _find_min_time(size_t wheel, Unit* tp) const {
        size_t slot_base = wheel << kSlotBits;
        size_t slot_end = (wheel+1) << kSlotBits;
        size_t slot_start = slot_in_wheel(now_, wheel) + 1 + slot_base;
        size_t global_slot = slot_bitmap_.find_first_set_bit(slot_start, slot_end);
        if (global_slot >= slot_end) {
            return false;
        }
        *tp = now_ & ~((Unit(1)<<(kSlotBits*(kWheels - wheel))) - 1);
        *tp += (global_slot - slot_base) << (kSlotBits * (kWheels - wheel - 1));
        return true;
    }

public:
    TimeWheel() = default;
    void debug_check_bitmap() const {
        for(size_t i=0;i<kSlotAll;++i){
            if (items_[i].empty()){
                log_assert(!slot_bitmap_.get_bit(i))<<"bit not 0, i="<<i;
            } else {
                log_assert(slot_bitmap_.get_bit(i))<<"bit not 1, i="<<i;
            }
        }
    }
    void init(Unit now) {
        assert(now >= 0);
        now_ = now;
        slot_bitmap_.clear();
    }
    bool empty() const { return 0 == size_;}
    size_t size() const { return size_;}
    // return mt, all timer after mt
    // if some timer timeouts or no timer, return now
    Unit min_time() const {
        if (empty() || !timeouts_.empty()) return now_;
        Unit tp = 0;
        for (size_t w=kWheels; w > 0 && !_find_min_time(w-1, &tp);--w);
        return tp;
    }
    void insert(Unit tp, T* t) {
        if (unlikely(t == nullptr)) {
            return;
        }
        auto node = get_link(t);
        assert(node->empty());
        assert(tp > 0);
        set_timeout(t, tp);
        ++size_;
        if (tp <= now_) {
            timeouts_.push_back(node);
            return;
        }
        _insert_to_slots(tp, node);
    }
    void remove(T* t) {
        if (unlikely(t == nullptr)) {
            return;
        }
        list_head* th = get_link(t);
        auto tp = get_timeout(t);
        if (th->empty() || tp < 0) return;
        set_timeout(t, 0);
        th->remove();
        assert(size_ > 0);
        --size_;
        if (tp > now_) {
            size_t global_slot = slot_in_global(tp);
            if (items_[global_slot].empty()) {
                slot_bitmap_.clear_bit(global_slot);
            }
        }
    }
    void pop(Unit now, list_head* result_list) {
        result_list->remove();
        if (now_ > now) {
            return;
        }
        if (now == now_) {
            _pop_to_list(result_list, &timeouts_);
            return;
        }
        auto start_wheel = wheel_at(now);
        /*
        now_: A1 B1 C1 D1
        now : A1 B2 C2 D2
        */
        for (size_t wheel = kWheels - 1; wheel > start_wheel; --wheel) {
            size_t end_slot = kSlotNum;
            _pop_wheel(wheel, end_slot, result_list);
        }
        size_t end_slot = slot_in_wheel(now, start_wheel);
        _pop_wheel(start_wheel, end_slot, result_list);
        now_ = now;
        size_t target_slot = start_wheel * kSlotNum + end_slot;
        list_head* slot_head = &items_[target_slot];
        while (slot_head->next != slot_head) {
            list_head* node=slot_head->next;
            T* t = from_link(node);
            auto tp = get_timeout(t);
            if (tp <= now) {
                _remove(node, result_list);
                continue;
            }
            _insert_to_slots(tp, node);
        }
        slot_bitmap_.clear_bit(target_slot);
    }

    DISALLOW_COPY(TimeWheel);
};

测试代码

struct TimeoutPerfItem {
    // TimerItem timer_item;
    TimerHeapItem timer_item;
    int64_t get_timeout_us() const { return timer_item.target_time;}
    const list_head* get_timeout_link() const { return &timer_item.link;}
};

// using TestTimer = TimerBySet<TimeoutPerfItem>;
using TestTimer = TimerByHeap<TimeoutPerfItem>;
// using TestTimer = wt::TimeWheel<TimeoutPerfItem>;

static void test_timer_benchmark(size_t num) {
    size_t rm_num = num/2;
    int64_t base_us = wt::steady_us();
    TestTimer tm1{};
    std::vector<TimeoutPerfItem> items;
    items.resize(num);
    auto m1 = wt::System::process_memory_kb();
    auto t0 = wt::steady_us();
    tm1.init(base_us);
    for (size_t i = 0; i < num; i++) {
        tm1.insert(base_us + i * 97, &items[i]);
    }
    auto t1 = wt::steady_us();
    auto m2 = wt::System::process_memory_kb();
    log_info()<<"timer benchmark: num="<<num;
    // insert num:1000000, mem_kb:0
    log_info()<<"insert num:"<<num<<",items_mem_kb:"<<m1<<", mem_kb:"<<(m2-m1);
    // insert cost_ms:8, avg_ns:8
    log_assert(tm1.size() == num)<<"insert, size="<<tm1.size();
    log_info()<<"insert cost_ms:"<<(t1-t0)/1000
        <<", avg_ns:"<<1000*(t1-t0)/num;
    auto t2 = wt::steady_us();
    for (size_t i = 0; i < rm_num;++i) {
        tm1.remove(&items[i]);
    }
    auto t3 = wt::steady_us();
    auto dt_rm = t3 - t2;
    log_assert(tm1.size() == num - rm_num)<<"remove, size="<<tm1.size();
    // remove cost_ms:2, avg_ns:4
    log_info()<<"remove cost_ms:"<<dt_rm/1000
        <<", avg_ns:"<<1000*dt_rm/rm_num;
    auto t4 = wt::steady_us();
    for (size_t i = rm_num; i < num;++i) {
        list_head result;
        tm1.pop(items[i].get_timeout_us(), &result);
        log_assert(result.size() == 1);
        log_assert(result.next == items[i].get_timeout_link());
    }
    auto t5 = wt::steady_us();
    auto dt_pop = t5 - t4;
    log_assert(tm1.size() == 0)<<"pop, size="<<tm1.size();
    // pop cost_ms:40, avg_ns:81
    log_info()<<"pop cost_ms:"<<dt_pop/1000
        <<", avg_ns:"<<1000*dt_pop/(num - rm_num);

    tm1.init(base_us);
    for (size_t i = rm_num; i < num; i++) {
        tm1.insert(base_us + i * 97, &items[i]);
    }
    // tm1.debug_check_bitmap();
    auto t6 = wt::steady_us();
    for (size_t i = rm_num; i < num;++i) {
        auto min_time = tm1.min_time();
        log_assert(min_time > items[i-1].get_timeout_us()
                && min_time <= items[i].get_timeout_us()) << "i=" <<i
            <<",min_time="<<min_time<<",expect="<<items[i].get_timeout_us();
        list_head result;
        tm1.pop(items[i].get_timeout_us(), &result);
        log_assert(result.size() == 1);
        log_assert(result.next == items[i].get_timeout_link());
    }
    auto dt7 = wt::steady_us() - t6;
    auto dt_mintime = std::max(dt7, dt_pop) - dt_pop;
    // min_time cost_ms:11, avg_ns:22
    log_info()<<"min_time cost_ms:"<<dt_mintime/1000
        <<", avg_ns:"<<1000*dt_mintime/(num - rm_num);
}

DefineBoolArg(enable_test_timer, "run test_timer()");
static void test_timer() {
    if (!enable_test_timer) return;
    for (size_t num: {10, 100, 1000, 2000}) {
        test_timer_benchmark(10000 * num);
    }
}

性能对比结果

实现类型 num insert remove pop min_time 内存/KB
基于Set 1万 147 74 50 0 4804
100万 186 103 40 0 42240
1000万 239 124 31 0 421872
2000万 285 132 35 2 468596
基于堆 1万 27 301 338 0 1108
100万 15 299 283 8 7392
1000万 14 398 422 0 73656
2000万 15 458 462 0 147704
基于时间轮 1万 4 4 53 7 0
100万 4 4 52 3 0
1000万 4 4 51 18 0
2000万 5 4 65 7 0

结论:基于set的实现插入耗时巨大,应该是申请内存和节点遍历的原因,删除耗时次之,可见节点太多后树太高,遍历耗时,pop只删除起始节点,耗时相对小一点,min_time()查询很快。set耗内存很高,1千万定时器内存达到422MB。
结论:基于堆的实现deque由大块内存组织而成,插入又是固定在尾巴,耗时很小;pop和删除总是在头部节点,删除后将尾部置换到头部再重新调整,需要调整堆的所有层,因此耗时巨大;min_time()没有耗时,符合预期。1千万定时器内存消耗74MB,相对set小了很多。
结论:基于时间轮的实现插入和删除耗时仅5ns,而且保持在常量时间,符合预期;pop耗时50-60ns,在可接受范围内,min_time查询时间在7ns左右,可以当做常量时间。内存上没有额外开销,完全采用item即可。综上性能上,完胜前两个实现。

上一篇下一篇

猜你喜欢

热点阅读