并发编程-Disruptor框架
上一篇 <<<Threadlocal
下一篇 >>>如何优化多线程总结
Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,
或者事件监听模式的实现,每秒处理6百万订单,业务逻辑处理器完全是运行在内存中【所有内存的可见性和正确性都是利用内存屏障或者CAS操作】,使用事件源驱动方式。
Disruptor和BlockingQueue区别
- 数据结构
BlockingQueue是链表,Disruptor是环形数组(ringbuffer)- 生产者消费者模型
生产者将数据推送给队列后,BlockingQueue是消费者主动取,Disruptor队列主动发送给消费者- 性能方面
BlockingQueue基于锁实现,效率低;Disruptor使用CAS无锁机制,性能远高于BlockingQueue
Disruptor速度快的原因
a、环形数组结构-----为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
b、元素位置定位------数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
c、无锁设计------每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
核心概念
RingBuffer——Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
底层实现:一个首尾相连的环形数组,上面有个读和写的指针,每次操作完都会+1,一旦越界,即从数组头开始继续读写,长度要求是2^n。
位置计算:(java的mod语法):12 % 10 = 2。
Sequencer——序号管理器,负责消费者/生产者各自序号、序号栅栏的管理和协调;
Sequence——序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况;
SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理;
EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。
核心代码
public static void main(String[] args) {
// 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建工厂
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.创建ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
// 4.创建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.连接消费端方法
disruptor.handleEventsWith(new LongEventHandler1());
disruptor.handleEventsWith(new LongEventHandler2());
// 6.启动
disruptor.start();
// 7.创建RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.关闭disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
相关文章链接:
多线程基础
线程安全与解决方案
锁的深入化
锁的优化
Java内存模型(JMM)
Volatile解决JMM的可见性问题
Volatile的伪共享和重排序
CAS无锁模式及ABA问题
Synchronized锁
Lock锁
AQS同步器
Condition
CountDownLatch同步计数器
Semaphore信号量
CyclicBarrier屏障
线程池
并发队列
Callable与Future模式
Fork/Join框架
Threadlocal
如何优化多线程总结