Hello Disruptor

2020-02-11  本文已影响0人  陈追风

hello-disruptor

GITHUB - Disruptor

并发编程基础

Atomic系列类 & UnSafe类

Atmoic系列类提供了原子性操作,保障多线程下的安全

UnSafe类的四大作用:

Volatile

J.U.C工具类

AQS锁

线程池核心

ThreadPoolExecutor pool = new ThreadPoolExecutor(
        核心线程数,
        最大线程数,
        存活时间,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(200),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new
                        Thread(r);
                t.setName("order- thread");
                if (t.isDaemon()) {
                    t.setDaemon(false);
                }
                if (Thread.NORM_PRIORITY != t.getPriority()) {
                    t.setPriority(Thread.NORM_PRIORITY);
                }
                return t;
            }
        },
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒绝策略:" + r + " executor:" + executor);
            }
        }
);

AQS架构

      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+

AQS - ReentrantLock

AQS - CountDownLatch

Disruptor Quick Start

  1. 建立一个工厂"Event类, 用于创建Event类实例对象
  2. 需要有一个监听事件类,用于处理数据(Event类)
  3. 实例化Disruptor实例,配置-系列参数,编写Disruptor核心组件
  4. 编写生产者组件,向Disruptor容器中去投递数据

Disruptor核心原理

Disruptor核心 - RingBuffer

GITHUB - Disruptor - Introduction

RingBuffer到底是啥

RingBuffer数据结构深入探究

RingBuffer:基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口

Disruptor:持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等引用

Disruptor核心 - Sequence

Disruptor核心 - Sequencer

Disruptor核心 - Sequence Barrier

Disruptor核心-WaitStrategy

BlockingWaitStrategyDisruptor使用的默认等待策略是BlockingWaitStrategy。内部BlockingWaitStrategy使用典型的锁和条件变量来处理线程唤醒。BlockingWaitStrategy是可用的等待策略中最慢的,但是在CPU使用率方面是最保守的,它将在最广泛的部署选项中提供最一致的行为。但是,再次了解已部署系统可以提高性能。

SleepingWaitStrategy 尝试通过使用简单的繁忙等待循环来保守CPU使用率像BlockingWaitStrategy一样,SleepingWaitStrategy尝试通过使用简单的繁忙等待循环来保守CPU使用率,但在循环中间使用对LockSupport.parkNanos(1)的调用。在典型的Linux系统上,这会将线程暂停大约60µs。但是,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均等待时间会更长。它在不需要低延迟但对生产线程的影响较小的情况下效果最佳。一个常见的用例是异步日志记录。

YieldingWaitStrategy是可在低延迟系统中使用的2种等待策略之一,在该系统中,可以选择刻录CPU周期以提高延迟。 YieldingWaitStrategy将忙于旋转以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。 当需要非常高的性能并且事件处理程序线程的数量少于逻辑核心的总数时,这是推荐的等待策略。注:启用超线程。

BusySpinWaitStrategy是性能最高的“等待策略”,但对部署环境设置了最高的约束。 仅当事件处理程序线程的数量小于包装盒上的物理核心数量时,才应使用此等待策略。 注:超线程应该被禁用。

Disruptor核心-Event

Disruptor核心-EventProcessor

Disruptor核心- EventHandler

Disruptor核心-WorkProcessor

Disruptor高级操作

并行计算

串行操作:使用链式调用的方式

public class Main {
    public static void main(String[] args) throws InterruptedException {
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                Trade::new,
                1024 * 1024,
                es,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.1 串行操作:
        disruptor
                .handleEventsWith(new Handler1())
                .handleEventsWith(new Handler2())
                .handleEventsWith(new Handler3());

        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();

        CountDownLatch latch = new CountDownLatch(1);

        long begin = System.currentTimeMillis();

        es1.submit(new TradePushlisher(latch, disruptor));


        latch.await();    //进行向下

        disruptor.shutdown();
        es.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

并行操作:使用单独调用的方式,可以有两种方式去进行

  1. handleEventsWith方法 添加多个handler实现即可
  2. handleEventsWith方法 分别进行调用
disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
disruptor.handleEventsWith(new Handler1());
disruptor.handleEventsWith(new Handler2());
disruptor.handleEventsWith(new Handler3());
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                Trade::new,
                1024 * 1024,
                es2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.2 并行操作: 可以有两种方式去进行
        //1. handleEventsWith方法 添加多个handler实现即可
        //2. handleEventsWith方法 分别进行调用
        disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
        //      disruptor.handleEventsWith(new Handler2());
        //      disruptor.handleEventsWith(new Handler3());

        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);
        long begin = System.currentTimeMillis();
        es1.submit(new TradePushlisher(latch, disruptor));
        latch.await();    //进行向下
        disruptor.shutdown();
        es2.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

并行计算-多边形高端操作

c1-> c2 -> c4
c1-> c3 -> c4
/**2.3 菱形操作 (一)*/
disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());

/**2.3 菱形操作 (二)*/
EventHandlerGroup<Trade> group = disruptor.handleEventsWith(new Handler1(), new Handler2());
group.then(new Handler3());
s -> h1 -> h2 -> h3
s -> h4 -> h5 -> h3
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        //因为是但消费者模式,有5个handler,所以此处线程池至少要5个
        ExecutorService es = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                Trade::new,
                1024 * 1024,
                es,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());

        //2 把消费者设置到Disruptor中 handleEventsWith
        /**2.4 六边形操作 (二)*/
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h4);
        disruptor.after(h1).handleEventsWith(h2);
        disruptor.after(h4).handleEventsWith(h5);
        disruptor.after(h2, h5).handleEventsWith(h3);
        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);
        long begin = System.currentTimeMillis();
        es1.submit(new TradePushlisher(latch, disruptor));
        latch.await();    //进行向下
        disruptor.shutdown();
        es.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

Disruptor - 多消费者模型讲解

workerPool(final RingBuffer<T> ringBuffer,
           final SequenceBarrier sequenceBarrier,
           final ExceptionHandler<? super T> exceptionHandler,
           final WorkHandler<? super T>... workHandlers)
public class Main {
    public static void main(String[] args) throws InterruptedException {
        //1 创建RingBuffer
        RingBuffer<Order> ringBuffer =
                RingBuffer.create(ProducerType.MULTI,
                        Order::new,
                        1024 * 1024,
                        new YieldingWaitStrategy());

        //2 通过ringBuffer 创建一个屏障
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //3 创建多个消费者数组:
        Consumer[] consumers = new Consumer[10];
        IntStream.range(0, consumers.length).forEach(i -> {
            consumers[i] = new Consumer("C" + i);
        });

        //4 构建多消费者工作池
        WorkerPool<Order> workerPool = new WorkerPool<Order>(
                ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                consumers);

        //5 设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        //6 启动workerPool
        workerPool.start(Executors.newFixedThreadPool(5));

        final CountDownLatch latch = new CountDownLatch(1);
        IntStream.range(0, 100).forEach(i -> {
            final Producer producer = new Producer(ringBuffer);
            CompletableFuture.runAsync(() -> {
                try {
                    latch.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                IntStream.range(0, 100).forEach(j -> {
                    producer.sendData(UUID.randomUUID().toString());
                });
            });
        });

        Thread.sleep(2000);
        System.err.println("----------线程创建完毕,开始生产数据----------");
        latch.countDown();

        Thread.sleep(8000);

        System.err.println("任务总数:" + consumers[2].getCount());
    }
}

Disruptor源码分析

Disruptor为何底层性能如此牛?

高性能之道 - 数据结构 - 内存预加载机制

高性能之道 - 内核 - 使用单线程写

高性能之道 - 系统内存优化 - 内存屏障

Linux内核源码 - https://github.com/opennetworklinux/linux-3.8.13/blob/master/kernel/kfifo.c

高性能之道 - 系统缓存优化 - 消除伪共享

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding { ... }

JDK1.8中增加了Contended注解方式来解决缓存伪共享问题。

在JDK1.8中,新增了一种注解@sun.misc.Contended,来使各个变量在Cache line中分隔开。注意,jvm需要添加参数-XX:-RestrictContended才能开启此功能

上一篇下一篇

猜你喜欢

热点阅读