Disruptor-01 初识

2020-06-02  本文已影响0人  rock_fish
Disruptor是什么

线程间通信的消息组件,类似java里的阻塞队列(BlockingQueue),与BlockingQueue的异同:

  1. 同:目的相同,都是为了在同一进程的线程间传输数据。
  2. 异:对消费者多播事件;预分配事件内存;可选无锁
使用场景
  1. 你是否有这样的应用场景,需要高性能的线程间通信的队列?
Disruptor的核心组成
  1. 消息
  2. 存放消息的容器
  3. 消息的生产者
  4. 消息的消费者
Disruptor的核心流程
  1. 构建Disruptor
    1.1 指定ringBuffer的大小,队列的容量是多大
    1.2 指定EventFactory
  2. Disruptor中添加消费者
  3. 启动Disruptor
  4. producer 往Disruptor中投递消息.
  5. Disruptor中有消费后,消费者开始消费消息
Disruptor的入门使用

事件类:

public class LongEvent {
    private long value;

    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

事件工厂类:

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

事件消费者

public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("----------------"+ longEvent.toString());
    }
}

事件转换器

public class LongEventTranslator  implements EventTranslatorOneArg<LongEvent,Long> {

    @Override
    public void translateTo(LongEvent longEvent, long l, Long aLong) {
        longEvent.set(aLong);
    }
}

事件生产者

public class LongEventProducer {
    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void publishData(Long aLong) {
        EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
        ringBuffer.publishEvent(translator, aLong);
    }
}

主流程

public static void main(String[] args) throws InterruptedException {

        // Executor that will be used to construct new threads for consumers
        Executor executor = Executors.newCachedThreadPool();

        // 事件工厂,用于创建event
        LongEventFactory factory = new LongEventFactory();

        // 指定ringbuf的大小,必须是2的整数倍
        int bufferSize = 1024;

        // 构建一个 Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

        // 给disruptor中添加消费者
        disruptor.handleEventsWith(new LongEventHandler());

        // 启动disruptor
        disruptor.start();

        //-----------万事俱备,只欠消息(消息的生产者投递消息)

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        // 跟blockqueue 比对一下

        for (long l = 0; l<100_0000; l++)
        {
            long startAt = System.currentTimeMillis();
            producer.publishData(l);
            long endAt = System.currentTimeMillis();
            System.out.println(endAt-startAt);
            //Thread.sleep(1000);
        }
    }

感谢你们


解读Disruptor系列-Disruptor官方介绍与入门指南

上一篇 下一篇

猜你喜欢

热点阅读