RxJavaJava服务器端编程编程语言爱好者

并发编程-Disruptor框架

2020-12-05  本文已影响0人  迦叶_金色的人生_荣耀而又辉煌

上一篇 <<<Threadlocal
下一篇 >>>如何优化多线程总结


Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,
或者事件监听模式的实现,每秒处理6百万订单,业务逻辑处理器完全是运行在内存中【所有内存的可见性和正确性都是利用内存屏障或者CAS操作】,使用事件源驱动方式。

Disruptor和BlockingQueue区别

  • 数据结构
    BlockingQueue是链表,Disruptor是环形数组(ringbuffer)
  • 生产者消费者模型
    生产者将数据推送给队列后,BlockingQueue是消费者主动取,Disruptor队列主动发送给消费者
  • 性能方面
    BlockingQueue基于锁实现,效率低;Disruptor使用CAS无锁机制,性能远高于BlockingQueue

Disruptor速度快的原因

a、环形数组结构-----为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
b、元素位置定位------数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
c、无锁设计------每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

核心概念

RingBuffer——Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
底层实现:一个首尾相连的环形数组,上面有个读和写的指针,每次操作完都会+1,一旦越界,即从数组头开始继续读写,长度要求是2^n。



位置计算:(java的mod语法):12 % 10 = 2。
Sequencer——序号管理器,负责消费者/生产者各自序号、序号栅栏的管理和协调;
Sequence——序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况;
SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理;
EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。

核心代码

public static void main(String[] args) {
   // 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理
   ExecutorService executor = Executors.newCachedThreadPool();
   // 2.创建工厂
   EventFactory<LongEvent> eventFactory = new LongEventFactory();
   // 3.创建ringBuffer 大小
   int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
   // 4.创建Disruptor
   Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
         ProducerType.SINGLE, new YieldingWaitStrategy());
   // 5.连接消费端方法
   disruptor.handleEventsWith(new LongEventHandler1());
   disruptor.handleEventsWith(new LongEventHandler2());
   // 6.启动
   disruptor.start();
   // 7.创建RingBuffer容器
   RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
   // 8.创建生产者
   LongEventProducer producer = new LongEventProducer(ringBuffer);
   // 9.指定缓冲区大小
   ByteBuffer byteBuffer = ByteBuffer.allocate(8);
   for (int i = 1; i <= 100; i++) {
      byteBuffer.putLong(0, i);
      producer.onData(byteBuffer);
   }
   //10.关闭disruptor和executor
   disruptor.shutdown();
   executor.shutdown();
}

相关文章链接:
多线程基础
线程安全与解决方案
锁的深入化
锁的优化
Java内存模型(JMM)
Volatile解决JMM的可见性问题
Volatile的伪共享和重排序
CAS无锁模式及ABA问题
Synchronized锁
Lock锁
AQS同步器
Condition
CountDownLatch同步计数器
Semaphore信号量
CyclicBarrier屏障
线程池
并发队列
Callable与Future模式
Fork/Join框架
Threadlocal
如何优化多线程总结

上一篇 下一篇

猜你喜欢

热点阅读