Disruptor

2018-12-03  本文已影响20人  java_飞

模式

1.发布订阅模式,同一事件会被多个消费者并行消费
2.点对点模式,同一事件会被一组消费者其中之一消费
3.顺序消费;

使用场景

低延迟,高吞吐量,有界的缓存队列

提高吞吐量,减少并发执行上下文之间的延迟并确保可预测延迟

为什么RingBuffer这么快?

1.首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果。
(就是一个缓存行8个变量,预设7个变量,然后再保存一个唯一变量,这样就不会出现相同的变量)

2.无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。

为什么要用Disruptor?

锁的成本: 传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。
伪共享问题导致的性能低下。
队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。

代码demo

public class MessageEvent<T> {
    private T message;

    public T getMessage() {
        return message;
    }

    public void setMessage(T message) {
        this.message = message;
    }
}
public class MessageEventFactory implements EventFactory<MessageEvent> {

    @Override
    public MessageEvent newInstance() {
        return new MessageEvent();
    }
}
public class MessageEvenHandler3 implements EventHandler<MessageEvent> {
    @Override
    public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
        System.out.println("----------------"+messageEvent.getMessage());

    }
}
public class MessageEventProducer {

    private RingBuffer<MessageEvent> ringBuffer;

    public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(String message) {
        EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator();
        ringBuffer.publishEvent(translator, message);
    }


}
public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {

    @Override
    public void translateTo(MessageEvent messageEvent, long l, String o2) {
            messageEvent.setMessage(o2);
    }
}
public class MessageExceptionHandler implements ExceptionHandler {

    @Override
    public void handleEventException(Throwable throwable, long l, Object o) {
        throwable.printStackTrace();
    }

    @Override
    public void handleOnStartException(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void handleOnShutdownException(Throwable throwable) {
        throwable.printStackTrace();
    }
}
public class MessageThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r,"Simple Disruptor Test Thread");
    }
}
public class MessageConsumer {

    public static void main(String[] args) {
        String message = "Hello Disruptor!";
        int ringBufferSize = 1024;//必须是2的N次方
        Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
//这里用的是单一生成者,如果是多生成者的话是另一种模式,自己的类实现WorkHandler接口,
//然后这边调用    disruptor.handleEventsWithWorkerPool(new MessageEventHandler());
        disruptor.handleEventsWith(new MessageEvenHandler3());
        disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
        RingBuffer<MessageEvent> ringBuffer = disruptor.start();
        MessageEventProducer producer = new MessageEventProducer(ringBuffer);
        IntStream.range(0,20).forEach(x->{
            producer.onData(x+message);
        });
    }
}

下面是实现WorkHandler接口的类

public class MessageEventHandler implements WorkHandler<MessageEvent> {

    @Override
    public void onEvent(MessageEvent messageEvent) throws Exception {
        System.out.println(System.currentTimeMillis()+"------我是1号消费者----------"+messageEvent.getMessage());
    }
}
部分摘自他人的文章,忘记出处了,文章可能有出入,如果有问题,请联系QQ:1107156537
上一篇下一篇

猜你喜欢

热点阅读