Disruptor队列 c++
简介
Disruptor是一个线程间通信的框架,即在多线程间共享数据。它是由LMAX公司开发的可信消息传递架构的一部分,以便用非常快速的方法来在多组件之间传递数据。最初是在JAVA上被发明的
原理
Disruptor的实现思路就是每一个数据都会有一个唯一自增的序号,用一个环形队列来存储数据,写入数据的时候先分配出可写的空间序号,然后再慢慢写数据,等到数据写完后再更新一下可读区域。这样因为每个线程独占一块空间写入数据,就不会有线程同步问题,唯一需要同步的地方是分配写入空间和更新可读区域。但是这两个操作都是非常简单的加操作,加锁太浪费,所以Disruptor直接使用原子变量加自旋等待来同步,获取极高的性能。
读取的时候则是先声明想读取的序号。然后就一直等待直到写入数据后更新的可读序号赶上想读取的序号。等待可以是CPU自旋等待或者放弃CPU时间片或者睡眠或者使用条件变量唤醒。此时可读序号之前的数据都是可读的,而读操作也不需要加锁。又可以获得极高的性能。读取后更新一下已读取序号,这样写入着就可以继续重复利用这块空间了。
当更新的可读序号大于想读取的序号的时候更可以批量的一次性读取,免于每次更新已读取序号浪费性能。
类简介
游标,通常用来记录可使用空间的下标。
本质是包装了一个原子int64变量。并在这个变量两边各填充了56个字节的空间,使得一个游标一定能单独占据一个64字节的cacheline。
提供读取、存储和增加值功能,基本是直接调用了std原子变量的基本操作。只不过调用的时候指定了内存屏障类型,尽量将内存屏障影响变小。内部外部都使用。
固定长度的环形缓冲区,本质就是包装了一个std的数组,对外只提供下标取值操作(中括号运算符),会对超过长度的序号自动取余来实现环形复用。内部使用。
用来写入数据的辅助工具类,主要负责分配空闲可写入空间,等待写入空间可使用状态。内部使用。
有两个版本:
- SingleThreadedStrategy:单线程写入版本,写入完成后不做等待。
- MultiThreadedStrategy:多线程写入版本,写入完成后等待之前的空间成为可读状态才继续。有多个线程写入的时候必须要使用此版本。
等待读取数据的工具类,主要负责等待可读取游标是否到达要读取的位置,可中断等待。内部使用。
有四个版本:
- BusySpinStrategy:不放弃cpu死循环等待
- YieldingStrategy:死循环一定次数后调用yield放弃cpu时间片。
- SleepingStrategy:死循环一定次数后重复调用yield放弃cpu时间片。调用yield一定次数后重复调用sleep睡眠指定时间。
- BlockingStrategy:阻塞等待,内部使用了条件变量,需要写入的时候唤醒。
WaitStrategy的封装,但是保存了队列的游标状态数据。用来在等待可读数据游标到达指定位置,外部使用。
包含了RingBuffer、Sequence、WaitStrategy、ClaimStrategy的封装类。统一封装了对外接口。
使用
构建Sequencer对象。Sequencer是一个模板类,可在模板参数中选择存储对象的类型、内部环形队列的长度、写入策略(ClaimStrategy版本)、读取策略(WaitStrategy类型)。
注意:模板参数中长度参数默认是1024,如果要设为其他值,那么必须明确指明ClaimStrategy类型的模板参数,并且设为同样的长度值
首先调用Sequencer对象的Claim方法传入长度参数分配指定长度的写入空间,此方法返回的是可以写入的数据序号的最大一个。直接使用序号作为下标访问向队列中填入数据(注意返回的是最大的序号,要先减去长度然后依次写)。然后调用Sequencer对象的Publish方法传入刚才Claim方法返回值和长度参数,此时就完成了写入数据的完整过程。
注意:因为内部是环形缓冲,如果写入的空间不足并且不加控制就会覆盖最前面的空间,如果其中有未读取的数据就会丢失
调用Sequencer对象的NewBarrier方法创建一个用来读取数据的SequenceBarrier对象指针。这个对象将用来等待数据的到来和读取数据
注意:这个对象最后使用结束后需要调用delete删除。
调用SequenceBarrier对象的WaitFor方法,传入期望获取的数据的序号。当队列中的可读数据序号超过或者等于传入的序号的时候,方法就会返回。返回值就是可读数据的最大的序号。使用序号作为下标即可读取数据。
注意:此处返回的是最大的可读数据序号,但是,没有办法知道从哪个序号开始的数据是没读过的,也就是说有可能此时你期望获取的数据已经被其他线程处理过了或者因为来不及处理已经被覆盖了。解决方法见下文
门限序列?翻译不准,大概就是这个意思吧。这个东西就是用来控制写入者不要覆盖未读取的数据的。免得数据来不及取出处理就被覆盖掉了。
首先每一个读取线程都需要维护一个Sequence对象来记录当前这个线程处理的进度,然后调用Sequencer对象的set_gating_sequences方法,传入一个存储每个线程进度的Sequence对象指针的vector的引用。然后在读取数据的地方,每次处理完数据后将已经处理完的数据序号更新到Sequence对象里。Claim方法内部就会每次检查更新后的Sequence对象,一直等待此处区域已经被处理完毕后在写入,防止覆盖未处理的数据。
如果不进行此步骤,写入操作会无限制的覆盖之前的数据。
Claim方法内部检查的是vector中最小的一个Sequence,如果有多个线程在读取的话,要把每个线程记录处理进度的Sequence对象指针放进vector里。
首先给出结论,多线程读取时每一条消息都会被所有的读取线程读取一遍。也就是说任何一条读取线程都不可能越过已经被其他线程读取过的数据。你期望获取的数据已经被其他线程处理过了这个问题是无解的。除非使用1条读取线程。
Disruptor在设计之初就是这么设计的,它遵循的是一种pub-sub的发布者-订阅者的分发模型。发布者发送的每一条消息每个订阅者都能够收到。
Disruptor的原始设计是支持几步具有特定顺序的串行流水线操作 - 这种操作在企业级的系统里很常见。
看如下的例子:
首先,所有事件都会写入硬盘(日志操作),以便容灾恢复。第二所有事件会备份(存储操作)到第二台服务器上,只有这些步骤都完成之后系统才能处理实际的业务操作(业务逻辑)。
串行做这三步操作是一个合理的做法,但不是最有效率的。日志和备份操作可以并行,因为它们相互独立。但业务操作不行,因为它依赖前两者。
如果使用Disruptor,前两步(日志和备份)可以直接并行的处理,用两个线程从同一个RingBuffer里读取事件,业务线程也会从同一个RingBuffer里读取事件,不过只能处理前两个线程处理完的事件,此处需要使用依赖功能。
如上面的例子中最后所说的那样,Disruptor提供了一个依赖功能使得一条读取线程读取的数据可以受到其他读取线程的控制。
调用Sequencer对象的NewBarrier方法创建一个用来读取数据的SequenceBarrier对象的时候需要传入一个参数,这个参数是一个Sequence对象指针的vector,其中的vector里存放的就是要使用这个SequenceBarrier对象的线程所依赖的其他读取线程所维护的序号游标。如果没有依赖传入一个空的vector即可。
当调用SequenceBarrier对象WaitFor方法时,如果创建时传入的是空vector,那么就会将期望读取的数据序号和队列中的当前可读序号相比较。而如果传入的vector不为空,就会将期望读取的数据序号和vector中的Sequence对象所指代的其他线程已经处理过的序号相比较。这样就形成了多线程读取的依赖。
经我验证,C++版本的阻塞等待工具类BlockingStrategy的实现有错误,无法唤醒,所以不要使用BlockingStrategy作为等待策略。
参考https://blog.csdn.net/bjrxyz/article/details/53084539