Java学习并发插件框架

Disruptor详解

2019-10-04  本文已影响0人  tracy_668

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。

disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
 官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。

队列

队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取。在计算中队列一般用来做排队(如线程池的等待排队,锁的等待排队),用来做解耦(生产者消费者模式),异步等等

在jdk中的队列都实现了java.util.Queue接口,在队列中又分为两类,一类是线程不安全的,ArrayDeque,LinkedList等等,还有一类都在java.util.concurrent包下属于线程安全,而在我们真实的环境中,我们的机器都是属于多线程,当多线程对同一个队列进行排队操作的时候,如果使用线程不安全会出现,覆盖数据,数据丢失等无法预测的事情,所以我们这个时候只能选择线程安全的队列。
其次还剩下ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表,在队列中,一般获取这个队列元素之后紧接着会获取下一个元素,或者一次获取多个队列元素都有可能,而数组在内存中地址是连续的,在操作系统中会有缓存的优化(下面也会介绍缓存行),所以访问的速度会略胜一筹,我们也会尽量去选择ArrayBlockingQueue。而事实证明在很多第三方的框架中,比如早期的log4j异步,都是选择的ArrayBlockingQueue。

在jdk中提供的线程安全的队列下面简单列举部分队列:

image.png

我们可以看见,我们无锁的队列是无界的,有锁的队列是有界的,这里就会涉及到一个问题,我们在真正的线上环境中,无界的队列,对我们系统的影响比较大,有可能会导致我们内存直接溢出,所以我们首先得排除无界队列,当然并不是无界队列就没用了,只是在某些场景下得排除。其次还剩下ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表。
(LinkedBlockingQueue 其实也是有界队列,但是不设置大小时就时Integer.MAX_VALUE),ArrayBlockingQueue,LinkedBlockingQueue也有自己的弊端,就是性能比较低,为什么jdk会增加一些无锁的队列,其实就是为了增加性能,很苦恼,又需要无锁,又需要有界,答案就是Disruptor

Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,并且是一个开源的并发框架,并获得2011Duke’s程序框架创新奖。能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在内部集成了Disruptor用来替代jdk的队列,以此来获得高性能。

为什么这么牛逼?

在Disruptor中有三大杀器:

3.1.1锁和CAS

我们ArrayBlockingQueue为什么会被抛弃的一点,就是因为用了重量级lock锁,在我们加锁过程中我们会把锁挂起,解锁后,又会把线程恢复,这一过程会有一定的开销,并且我们一旦没有获取锁,这个线程就只能一直等待,这个线程什么事也不能做。

CAS(compare and swap),顾名思义先比较在交换,一般是比较是否是老的值,如果是的进行交换设置,大家熟悉乐观锁的人都知道CAS可以用来实现乐观锁,CAS中没有线程的上下文切换,减少了不必要的开销
而我们的Disruptor也是基于CAS。

3.1.2伪共享

到了伪共享就不得不说计算机CPU缓存,缓存大小是CPU的重要指标之一,而且缓存的结构和大小对CPU速度的影响非常大,CPU内缓存的运行频率极高,一般是和处理器同频运作,工作效率远远大于系统内存和硬盘。实际工作时,CPU往往需要重复读取同样的数据块,而缓存容量的增大,可以大幅度提升CPU内部读取数据的命中率,而不用再到内存或者硬盘上寻找,以此提高系统性能。但是从CPU芯片面积和成本的因素来考虑,缓存都很小。

image.png

CPU缓存可以分为一级缓存,二级缓存,如今主流CPU还有三级缓存,甚至有些CPU还有四级缓存。每一级缓存中所储存的全部数据都是下一级缓存的一部分,这三种缓存的技术难度和制造成本是相对递减的,所以其容量也是相对递增的。

每一次你听见intel发布新的cpu什么,比如i7-7700k,8700k,都会对cpu缓存大小进行优化,感兴趣可以自行下来搜索,这些的发布会或者发布文章。

Martin和Mike的 QConpresentation演讲中给出了一些每个缓存时间:

image.png
缓存行

在cpu的多级缓存中,并不是以独立的项来保存的,而是类似一种pageCahe的一种策略,以缓存行来保存,而缓存行的大小通常是64字节,在Java中Long是8个字节,所以可以存储8个Long,举个例子,你访问一个long的变量的时候,他会把帮助再加载7个,我们上面说为什么选择数组不选择链表,也就是这个原因,在数组中可以依靠缓冲行得到很快的访问。

image.png

缓存行是万能的吗?NO,因为他依然带来了一个缺点,我在这里举个例子说明这个缺点,可以想象有个数组队列,ArrayQueue,他的数据结构如下:

class ArrayQueue{
    long maxSize;
    long currentIndex;
}

对于maxSize是我们一开始就定义好的,数组的大小,对于currentIndex,是标志我们当前队列的位置,这个变化比较快,可以想象你访问maxSize的时候,是不是把currentIndex也加载进来了,这个时候,其他线程更新currentIndex,就会把cpu中的缓存行置位无效,请注意这是CPU规定的,他并不是只吧currentIndex置位无效,如果此时又继续访问maxSize他依然得继续从内存中读取,但是MaxSize却是我们一开始定义好的,我们应该访问缓存即可,但是却被我们经常改变的currentIndex所影响。

image.png
Padding的魔法

为了解决上面缓存行出现的问题,在Disruptor中采用了Padding的方式,

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

其中的Value就被其他一些无用的long变量给填充了。这样你修改Value的时候,就不会影响到其他变量的缓存行。

最后顺便一提,在jdk8中提供了@Contended的注解,当然一般来说只允许Jdk中内部,如果你自己使用那就得配置Jvm参数 -RestricContentended = fase,将限制这个注解置位取消。很多文章分析了ConcurrentHashMap,但是都把这个注解给忽略掉了,在ConcurrentHashMap中就使用了这个注解,在ConcurrentHashMap每个桶都是单独的用计数器去做计算,而这个计数器由于时刻都在变化,所以被用这个注解进行填充缓存行优化,以此来增加性能。


image.png

下面的例子是测试利用cache line的特性和不利用cache line的特性的效果对比.

public class CacheLineEffect {
    //考虑一般缓存行大小是64字节, 一个 long 类型占8字节
    static  long[][] arr;

    public static void main(String[] args) {
        arr = new long[1024 * 1024][];
        for (int i = 0; i < 1024 * 1024; i++) {
            arr[i] = new long[8];
            for (int j = 0; j < 8; j++) {
                arr[i][j] = 0L;
            }
        }
        long sum = 0L;
        long marked = System.currentTimeMillis();
        for (int i = 0; i < 1024 * 1024; i+=1) {
            for(int j =0; j< 8;j++){
                sum = arr[i][j];
            }
        }
        System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

        marked = System.currentTimeMillis();
        for (int i = 0; i < 8; i+=1) {
            for(int j =0; j< 1024 * 1024;j++){
                sum = arr[j][i];
            }
        }
        System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
    }
}
image.png

什么是伪共享

ArrayBlockingQueue有三个成员变量:

takeIndex: 需要被取走的元素下标
putIndex: 可被元素插入的位置的下标
count: 队列中元素的数量

这三个变量很容易放到一个缓存行中, 但是之间修改没有太多的关联. 所以每次修改, 都会使之前缓存的数据失效, 从而不能完全达到共享的效果.

image.png

如上图所示, 当生产者线程put一个元素到ArrayBlockingQueue时, putIndex会修改, 从而导致消费者线程的缓存中的缓存行无效, 需要从主存中重新读取.

这种无法充分使用缓存行特性的现象, 称为伪共享

3.1.3RingBuffer

ringbuffer到底是什么
它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。

image.png

基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(如下图右边的图片表示序号,这个序号指向数组的索引4的位置。)

image.png
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。 image.png

要找到数组中当前序号指向的元素,可以通过sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

常用的队列之间的区别

ringbuffer采用这种数据结构原因

如何从Ringbuffer读取
image.png

消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。

消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12。ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号.

接下来

接下来,消费者会一直逛来逛去,等待更多数据被写入 Ring Buffer。并且,写入数据后消费者会收到通知——节点 9,10,11 和 12 已写入。现在序号 12 到了,消费者可以指示 ConsumerBarrier 去拿这些序号里的数据了。

image.png

在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。

实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。
如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

image.png

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

下面忽略数组的环形结构, 介绍一下如何实现无锁设计. 整个过程通过原子变量CAS, 保证操作的线程安全.

一个生产者

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 这儿主要判断是否会覆盖未读的元素
  3. 若是返回的正确, 则生产者开始写入元素.


    image.png

多个生产者
多个生产者的情况下, 会遇到“如何防止多个线程重复写同一个元素”的问题. Disruptor的解决方法是, 每个线程获取不同的一段数组空间进行操作. 这个通过CAS很容易达到. 只需要在分配元素的时候, 通过CAS判断一下这段空间是否已经分配出去即可.

但是会遇到一个新问题: 如何防止读取的时候, 读到还未写的元素. Disruptor在多个生产者的情况下, 引入了一个与Ring Buffer大小相同的buffer: available Buffer. 当某个位置写入成功的时候, 便把availble Buffer相应的位置置位, 标记为写入成功. 读取的时候, 会遍历available Buffer, 来判断元素是否已经就绪.

读数据
生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n, 这时仍然无法确定连续可读的最大下标. 从reader cursor开始读取available Buffer, 一直查到第一个不可用的元素, 然后返回最大连续可读元素的位置;
  3. 消费者读取元素.

如下图所示, 读线程读到下标为2的元素, 三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据, 写线程被分配到的最大元素下标是11.
读线程申请读取到下标从3到11的元素, 判断writer cursor>=11. 然后开始读取availableBuffer, 从3开始, 往后读取, 发现下标为7的元素没有生产成功, 于是WaitFor(11)返回6.

然后, 消费者读取下标从3到6共计4个元素.

image.png

写数据
多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入, 则返回最大的序列号. 每个生产者会被分配一段独享的空间;
  3. 生产者写入元素, 写入元素的同时设置available Buffer里面相应的位置, 以标记自己哪些位置是已经写入成功的.
    如下图所示, Writer1和Writer2两个线程写入数组, 都申请可写的数组空间. Writer1被分配了下标3到下表5的空间, Writer2被分配了下标6到下标9的空间.

Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位, 标记已经写入成功, 往后移一位, 开始写下标4位置的元素. Writer2同样的方式. 最终都写入完成.


image.png

防止不同生产者对同一段空间写入的代码, 如下所示:

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));

    return next;
}

通过do/while循环的条件cursor.compareAndSet(current, next), 来判断每次申请的空间是否已经被其他生产者占据. 假如已经被占据, 该函数会返回失败, While循环重新执行, 申请写入空间.

消费者的流程与生产者非常类似, 这儿就不多描述了. Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能.

3.2Disruptor怎么使用

package concurrent;

import sun.misc.Contended;

import java.util.concurrent.ThreadFactory;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

/**
 * @Description:
 * @Created on 2019-10-04
 */
public class DisruptorTest {
    public static void main(String[] args) throws Exception {
        // 队列中的元素
        class Element {
            @Contended
            private String value;


            public String getValue() {
                return value;
            }

            public void setValue(String value) {
                this.value = value;
            }
        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            int i = 0;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread" + String.valueOf(i++));
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {
                System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence);
//                Thread.sleep(10000000);
            }
        };


        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 8;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((element, sequence) -> {
                System.out.println("之前的数据" + element.getValue() + "当前的sequence" + sequence);
                element.setValue("我是第" + sequence + "个");
            });

        }
    }
}

在Disruptor中有几个比较关键的:

  1. BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
  2. BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
  3. YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu
上一篇下一篇

猜你喜欢

热点阅读