Disruptor-cpp 简介

2018-11-11  本文已影响0人  LiarMaiq

简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,使用JAVA语言编写,号称基于Disruptor开发的系统单线程能支撑每秒600万订单。不过,今天主要介绍的是Disruptor的一个C++版本,github地址:https://github.com/Abc-Arbitrage/Disruptor-cpp java版本:https://github.com/LMAX-Exchange/disruptor

编译安装

首先需要获取Disruptor-cpp源代码,地址在上面“简介”中。需要注意的是该版本的Disruptor引用了C++扩展库boost的部分内容,因此除了Disruptor-cpp本身的源代码以外,还需要额外下载一份boost源代码,地址:https://www.boost.org/

编码示例

利用Disruptor-cpp进行开发,首先需要理解它的基本结构(设计模式),换句话说也就是Disruptor-cpp都包含那些基本组件以及它们之间有着怎样的关系。

  1. Sequence: 表示事件的序号,可以理解为RingBuffer上的一个地址,拿到一个序号就获取了RingBuffer上一个事件操作的权限;
  2. Sequencer: Sequence只是一个事件的序号,要想获取一个序号就需要使用Sequencer向RingBuffer申请,因此Sequencer可以理解为生产者与缓存RingBuffer之间的桥梁;
  3. WaitStrategy: 表示等待策略,如需要发布事件时RingBuffer上已无可用位置或事件处理去获取事件时RingBuffer已无可消费事件时如何进行等待;
  4. SequenceBarrier: 消费者要获取一个事件并非直接访问RingBuffer,而是需要通过SequenceBarrier,因此SequenceBarrier可以理解为消费者与缓存RingBuffer之间的桥梁(注意与Sequencer的对比);
  5. EventProcessor: 事件处理器,是消费者线程池Executor的调度单元,是对事件处理EventHandler与异常处理ExceptionHandler等的一层封装;
  6. Event: 消费事件,具体实现由用户定义;
  7. RingBuffer: 基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口;
  8. Disruptor: Disruptor的使用入口,持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等。
    更加相信信息还可以参考以下两篇文章:
    https://www.cnblogs.com/zhaohongtian/p/6801638.html
    https://www.cnblogs.com/daoqidelv/p/6995888.html

下面是Linux平台下示例代码:

// DisruptorDemo.cpp
#include "Disruptor/Disruptor.h"
#include "Disruptor/ThreadPerTaskScheduler.h"
#include <iostream>
#include <string>
#include <cstdint>
#include <memory>
#include <cstdlib>
#include <time.h>

// 定义事件体
struct Event
{
    int64_t id{ 0 };
    std::string str;
};

// 继承事件处理器接口
class Worker : public Disruptor::IWorkHandler< Event >
{
public:
    explicit Worker(){}
    // 重写事件处理回调函数
    void onEvent(Event& event) override
    {
        usleep(1000);
        _actuallyProcessed++;
    }

private:
    int32_t _actuallyProcessed{ 0 };
};

class Producer
{
public:
    Producer(std::int32_t ringBufferSize, std::int32_t workerCount)
    {
        m_ringBufferSize = ringBufferSize;
        m_workerCount = workerCount;
        // 创建调度器
        m_ptrTaskScheduler = std::make_shared< Disruptor::ThreadPerTaskScheduler >();
        // 创建Disruptor
        m_ptrDisruptor = std::make_shared< Disruptor::disruptor<Event> >([]() { return Event(); }
        , m_ringBufferSize, m_ptrTaskScheduler);
        // 创建事件处理器
        for (size_t i = 0; i < m_workerCount; i++)
        {
            m_workers.push_back(std::make_shared< Worker >());
        }
        // 绑定
        m_ptrDisruptor->handleEventsWithWorkerPool(m_workers);
    }

    ~Producer()
    {
        stop();
    }

    void start()
    {
        m_ptrTaskScheduler->start();
        m_ptrDisruptor->start();
    }

    void push(const Event &event)
    {
        auto ringBuffer = m_ptrDisruptor->ringBuffer();
        auto nextSequence = ringBuffer->next();
        (*ringBuffer)[nextSequence] = event;
        ringBuffer->publish(nextSequence);
    }

protected:
    void stop()
    {
        m_ptrDisruptor->shutdown();
        m_ptrTaskScheduler->stop();
    }

private:
    std::shared_ptr<Disruptor::ThreadPerTaskScheduler> m_ptrTaskScheduler;
    std::shared_ptr<Disruptor::disruptor<Event> > m_ptrDisruptor;
    std::vector<std::shared_ptr<Disruptor::IWorkHandler<Event> > > m_workers;
    std::int32_t m_ringBufferSize{ 1024 };
    std::int32_t m_workerCount{ 1 };
    std::int32_t m_schedulerCount{ 1 };
};


int main(int argc, char* argv[])
{
    timespec t1,t2;
    clock_gettime(CLOCK_MONOTONIC, &t1);

    if(argc < 4)
        return 0;

    Producer producer(atoi(argv[1]), atoi(argv[2]));
    producer.start();

    for (size_t i = 0; i < atoi(argv[3]); i++)
    {
        producer.push(Event());
    }

    clock_gettime(CLOCK_MONOTONIC, &t2);

    std::cout << "event num:" << argv[3] << std::endl;
    std::cout << "worker num:" << argv[2] << std::endl;
    std::cout << "buffer size:" << argv[1] << std::endl;
    std::cout << "time:" << 
    (t2.tv_nsec - t1.tv_nsec) / 1000000 + (t2.tv_sec - t1.tv_sec) * 1000
    << std::endl;
    
    return 0;
}
# DisruptorDemo CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(DisruptorDemo)

set(CMAKE_BUILD_TYPE Release)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

add_executable(DisruptorDemo ${CMAKE_SOURCE_DIR}/src/DisruptorDemo.cpp)
target_link_libraries(DisruptorDemo Disruptor boost_system boost_thread boost_chrono)

下表记录了在不同Buffer Size和Worker Num时,程序处理2000个事件的总耗时(单位/毫秒,注意以下数均是在Windows平台下试验得到的,如果是在Linux平台下性能会好很多,但其变化趋势基本相同。另外队列不能设置太大,否则测得时间是不准确的,因为队列过大将导致很多数据被压入队列但实际上并没有被处理完毕,主函数就已经达到第二个计时点,因此队列大小相对于事件数量应该是可忽略的):

队列大小 \ 事件处理器数量 1 5 10 100
2 11391 6859 5922 7156
4 11344 3391 2797 3891
8 12953 2250 2094 2375
32 11782 1937 969 1297

分析表中数据大体能够得出以下结论(仅针对本文的示例程序):

需要注意的是,队列也并非越大越好。因为随着队列的增大,其能够缓冲的事件量也会增加,在面对事件的生成速度大于处理速度时,就会导致大量的事件被缓冲在队列中,从而可能引发事件处理超时的情况;虽然事件处理器数量越大在应对并发量大的情形的时候会表现越好,但同时也会增加系统在线程调度方面的开销,因此选择合适的事件处理器数量也很重要;另外,还应当考虑事件的平均处理时间,如果事件处理耗时较大,应当适当增加事件处理器数量。

上一篇下一篇

猜你喜欢

热点阅读