解读Disruptor系列

解读Disruptor系列--解读源码(2)之生产者

2017-09-07  本文已影响159人  coder_jerry

之前我们一起分析了Disruptor的初始化和启动代码,接下来我们来分析下生产者的发布代码。还不太了解的同学建议看看我之前发的Disruptor原理翻译和导读文章,尤其是一些名词概念最好要清楚是做什么用的。

1 生产者线程

生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作(注:translate经常是“翻译”的意思,但其实还有“ move from one place or condition to another.”的转移、转换的意思)。
根据同一事件传入参数的多少,可以选择不同接口接收参数。

image.png
/**
* 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{

  static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();

  @Override
  public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
    event.data = arg0 ;
    System.err.println("put data "+sequence+", "+event+", "+arg0);
  }
}

// 生产线程0
Thread produceThread0 = new Thread(new Runnable() {
  @Override
  public void run() {
    int x = 0;
    while(x++ < events / 2){ // 除了下面这行代码,其他都没有关系
      disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
    }
  }
});
// 生产线程1
Thread produceThread1 = new Thread(new Runnable() {
  @Override
  public void run() {
    int x = 0;
    while(x++ < events / 2){
      disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
    }
  }
});

produceThread0.start();
produceThread1.start();

在demo中,我们实例化并启动了两个线程,用来生产事件放置到Disruptor中。
接下来我们跟随源码一点点深入。

2 生产事件的整体逻辑

// Disruptor.java
/**
* Publish an event to the ring buffer. 使用给定的事件翻译器,发布事件
*
* @param eventTranslator the translator that will load data into the event.
* @param arg            A single argument to load into the event
*/
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
{
    ringBuffer.publishEvent(eventTranslator, arg);
}
之前也讲过,Disruptor这个类是一个辅助类,在发布事件时其实是委托给RingBuffer完成发布操作。
RingBuffer.publishEvent()的逻辑大概分为两个步骤:第一步先占有RingBuffer上的一个可用位置,我们简称为“占坑”;第二步在可用位置发布数据,我们简称为“填坑”。
// RingBuffer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
    final long sequence = sequencer.next(); // 第一步 占坑
    translateAndPublish(translator, sequence, arg0); // 第二步 填坑
}

其中第二步中,在填坑完毕还要调用Sequencer接口的publish方法对外发布事件。为啥呢?先留个疑问。
在第一步占坑中,首先通过调用Sequencer.next()获取RingBuffer实例下一个能用的序号。
AbstractSequencer作为一个抽象类,实现了Sequencer接口,是单生产者Sequencer和多生产者Sequencer的父类。

3 Disruptor的核心--Sequencer接口

为什么说Sequencer是Disruptor的核心呢?其实这也不是我说的,是Disruptor官方Wiki Introduction上说的:

image.png
Sequencer是用来保证生产者和消费者之间正确、高速传递数据的。我们先来看看以生产者的角度看Sequencer有什么作用。
先来张类图。
Sequencer类图
下边是Sequencer接口及其父接口Cursored、Sequenced 定义。
// Sequencer
/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
public interface Sequencer extends Cursored, Sequenced
{
    /**
    * Set to -1 as sequence starting point
    * 序号开始位置
    */
    long INITIAL_CURSOR_VALUE = -1L;

    /**
    * Claim a specific sequence.  Only used if initialising the ring buffer to
    * a specific value.
    *
    * @param sequence The sequence to initialise too.
    * 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了
    */
    void claim(long sequence);

    /**
    * Confirms if a sequence is published and the event is available for use; non-blocking.
    *
    * @param sequence of the buffer to check
    * @return true if the sequence is available for use, false if not
    * 用非阻塞方式,确认某个序号是否已经发布且事件可用。
    */
    boolean isAvailable(long sequence);

    /**
    * Add the specified gating sequences to this instance of the Disruptor.  They will
    * safely and atomically added to the list of gating sequences.
    *
    * @param gatingSequences The sequences to add.
    * 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者
    */
    void addGatingSequences(Sequence... gatingSequences);

    /**
    * Remove the specified sequence from this sequencer.
    *
    * @param sequence to be removed.
    * @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
    * 从门控序列中移除指定序列
    */
    boolean removeGatingSequence(Sequence sequence);

    /**
    * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
    * are available to be read from the ring buffer given a list of sequences to track.
    *
    * @param sequencesToTrack
    * @return A sequence barrier that will track the specified sequences.
    * @see SequenceBarrier
    * 消费者使用,用于追踪指定序列(通常是上一组消费者的序列)
    */
    SequenceBarrier newBarrier(Sequence... sequencesToTrack);

    /**
    * Get the minimum sequence value from all of the gating sequences
    * added to this ringBuffer.
    *
    * @return The minimum gating sequence or the cursor sequence if
    * no sequences have been added.
    * 获取追踪序列中最小的序列
    */
    long getMinimumSequence();

    /**
    * Get the highest sequence number that can be safely read from the ring buffer.  Depending
    * on the implementation of the Sequencer this call may need to scan a number of values
    * in the Sequencer.  The scan will range from nextSequence to availableSequence.  If
    * there are no available values <code>>= nextSequence</code> the return value will be
    * <code>nextSequence - 1</code>.  To work correctly a consumer should pass a value that
    * is 1 higher than the last sequence that was successfully processed.
    *
    * @param nextSequence      The sequence to start scanning from.
    * @param availableSequence The sequence to scan to.
    * @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
    *
    * 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence
    * 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者
    * 应该传递一个比最后成功处理的序列值大1的值。
    */
    long getHighestPublishedSequence(long nextSequence, long availableSequence);

    <T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}

// Cursored.java
/**
* Implementors of this interface must provide a single long value
* that represents their current cursor value.  Used during dynamic
* add/remove of Sequences from a
* {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
* 游标接口,用于获取生产者当前游标位置
*/
public interface Cursored
{
    /**
    * Get the current cursor value.
    *
    * @return current cursor value
    */
    long getCursor();
}

// Sequenced.java
public interface Sequenced
{
    /**
    * The capacity of the data structure to hold entries.
    *
    * @return the size of the RingBuffer.
    * 获取环形缓冲的大小
    */
    int getBufferSize();

    /**
    * Has the buffer got capacity to allocate another sequence.  This is a concurrent
    * method so the response should only be taken as an indication of available capacity.
    *
    * @param requiredCapacity in the buffer
    * @return true if the buffer has the capacity to allocate the next sequence otherwise false.
    * 判断是否含有指定的可用容量
    */
    boolean hasAvailableCapacity(final int requiredCapacity);

    /**
    * Get the remaining capacity for this sequencer.
    *
    * @return The number of slots remaining.
    * 剩余容量
    */
    long remainingCapacity();

    /**
    * Claim the next event in sequence for publishing.
    *
    * @return the claimed sequence value
    * 生产者发布时,申请下一个序号
    */
    long next();

    /**
    * Claim the next n events in sequence for publishing.  This is for batch event producing.  Using batch producing
    * requires a little care and some math.
    * <pre>
    * int n = 10;
    * long hi = sequencer.next(n);
    * long lo = hi - (n - 1);
    * for (long sequence = lo; sequence <= hi; sequence++) {
    *    // Do work.
    * }
    * sequencer.publish(lo, hi);
    * </pre>
    *
    * @param n the number of sequences to claim
    * @return the highest claimed sequence value
    * 申请n个序号,用于批量发布
    */
    long next(int n);

    /**
    * Attempt to claim the next event in sequence for publishing.  Will return the
    * number of the slot if there is at least <code>requiredCapacity</code> slots
    * available.
    *
    * @return the claimed sequence value
    * @throws InsufficientCapacityException
    * next()的非阻塞模式
    */
    long tryNext() throws InsufficientCapacityException;

    /**
    * Attempt to claim the next n events in sequence for publishing.  Will return the
    * highest numbered slot if there is at least <code>requiredCapacity</code> slots
    * available.  Have a look at {@link Sequencer#next()} for a description on how to
    * use this method.
    *
    * @param n the number of sequences to claim
    * @return the claimed sequence value
    * @throws InsufficientCapacityException
    * next(n)的非阻塞模式
    */
    long tryNext(int n) throws InsufficientCapacityException;

    /**
    * Publishes a sequence. Call when the event has been filled.
    *
    * @param sequence
    * 数据填充后,发布此序号
    */
    void publish(long sequence);

    /**
    * Batch publish sequences.  Called when all of the events have been filled.
    *
    * @param lo first sequence number to publish
    * @param hi last sequence number to publish
    * 批量发布序号
    */
    void publish(long lo, long hi);
}

3.1 单生产者发布事件

下边先看使用单生产者SingleProducerSequencer具体是怎么占坑的。

// SingleProducerSequencer.java
@Override
public long next()
{
    return next(1);
}

/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }
    // 复制上次申请完毕的序列值
    long nextValue = this.nextValue;
    // 加n,得到本次需要申请的序列值,单个发送n为1
    long nextSequence = nextValue + n; // 本次要验证的值
    // 可能发生绕环的点,本次申请值 - 一圈长度
    long wrapPoint = nextSequence - bufferSize; // 400米跑到,小明跑了599米,小红跑了200米。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。
    long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者
    // wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。
    // 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧?
    // 没有空坑位,将进入循环等待。
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        cursor.setVolatile(nextValue);  // StoreLoad fence

        long minSequence;
        // 只有当消费者消费,向前移动后,才能跳出循环
        // 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {  // 环形等待的消费者
            waitStrategy.signalAllWhenBlocking();
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        // 当消费者向前消费后,更新缓存的最小序号
        this.cachedValue = minSequence;
    }
    // 将成功申请的序号赋值给对象实例变量
    this.nextValue = nextSequence;

    return nextSequence;
}

next()占坑成功将会返回坑位号,回到RingBuffer的publishEvent方法,执行translateAndPublish方法,进行填坑和发布操作。

// RingBuffer.java
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
    try
    {
        translator.translateTo(get(sequence), sequence);
    }
    finally
    {
        sequencer.publish(sequence);
    }
}

translator参数用户定义的对EventTranslator接口的实现对象。
上文已经介绍过EventTranslator接口,除EventTranslator外,还有EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg。功能是将给定的数据填充到指定坑位的对象(因为RingBuffer上已经预先分配了对象)上,只不过分别对应不同参数。简单看下EventTranslatorOneArg接口定义。

public interface EventTranslatorOneArg<T, A>
{
  /**
  * Translate a data representation into fields set in given event
  *
  * @param event into which the data should be translated.
  * @param sequence that is assigned to event.
  * @param arg0 The first user specified argument to the translator
  */
  void translateTo(final T event, long sequence, final A arg0);
}

在放好数据后,就可以调用sequencer的publish方法发布对象了。首先是更新当前游标,更新完毕再通知等待中的消费者,消费者将继续消费。关于消费者的等待策略,后续还会讲到。

// SingleProducerSequencer.java 
@Override
public void publish(long sequence)
{  // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
    cursor.set(sequence);
    // 除signalAllWhenBlocking外都是空实现
    waitStrategy.signalAllWhenBlocking();
}

// BlockingWaitStrategy.java
@Override
public void signalAllWhenBlocking()
{
    lock.lock();
    try
    {
        processorNotifyCondition.signalAll();
    }
    finally
    {
        lock.unlock();
    }
}

3.2 插播Disruptor中的高效AtomicLong--Sequence

注意那个cursor,这个cursor可不是简单的long类型,而是Disruptor内部实现的Sequence类。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{   // value的前后各有7个long变量,用于缓存行填充,前后各7个保证了不管怎样,当64位的缓存行加载时value,不会有其他变量共享缓存行,从而解决了伪共享问题
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 *
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 * Sequence可以按照AtomicLong来理解,除了Sequence消除了伪共享问题,更加高效
 */
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    /**
     * Create a sequence initialised to -1.
     */
    public Sequence()
    {
        this(INITIAL_VALUE);
    }

    /**
     * Create a sequence with a specified initial value.
     *
     * @param initialValue The initial value for this sequence.
     */
    public Sequence(final long initialValue)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
    }

    /**
     * Perform a volatile read of this sequence's value.
     *
     * @return The current value of the sequence.
     */
    public long get()
    {
        return value;
    }

    /**
     * Perform an ordered write of this sequence.  The intent is
     * a Store/Store barrier between this write and any previous
     * store.
     *
     * @param value The new value for the sequence.
     * 此方法等同于AtomicLong#lazySet(long newValue),
     * 和直接修改volatile修饰的value相比,非阻塞,更高效,但更新的值会稍迟一点看到
     */
    public void set(final long value)
    {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    /**
     * Performs a volatile write of this sequence.  The intent is
     * a Store/Store barrier between this write and any previous
     * write and a Store/Load barrier between this write and any
     * subsequent volatile read.
     *
     * @param value The new value for the sequence.
     */
    public void setVolatile(final long value)
    {
        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
    }

    /**
     * Perform a compare and set operation on the sequence.
     *
     * @param expectedValue The expected current value.
     * @param newValue The value to update to.
     * @return true if the operation succeeds, false otherwise.
     */
    public boolean compareAndSet(final long expectedValue, final long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }

    /**
     * Atomically increment the sequence by one.
     *
     * @return The value after the increment
     */
    public long incrementAndGet()
    {
        return addAndGet(1L);
    }

    /**
     * Atomically add the supplied value.
     *
     * @param increment The value to add to the sequence.
     * @return The value after the increment.
     */
    public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;

        do
        {
            currentValue = get();
            newValue = currentValue + increment;
        }
        while (!compareAndSet(currentValue, newValue));

        return newValue;
    }

    @Override
    public String toString()
    {
        return Long.toString(get());
    }
}

这个Sequence其实相当于AtomicLong,最大的区别在于Sequence解决了伪共享问题。另外Sequence#set相当于AtomicLong#lazySet。
致此,使用单生产者发布事件的流程就完成了。

3.3 多生产者发布事件

如果使用的是多生产者,占坑则调用MultiProducerSequencer.next()。

@Override
public long next()
{
    return next(1);
}

/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        current = cursor.get(); // 当前游标值,初始化时是-1
        next = current + n;

        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
        {
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

            if (wrapPoint > gatingSequence)
            {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                continue;
            }

            gatingSequenceCache.set(gatingSequence);
        }
        else if (cursor.compareAndSet(current, next))
        {
            break;
        }
    }
    while (true);

    return next;
}

可以发现,多生产者模式占坑和放置数据的逻辑和单生产者模式区别不大。区别主要是最后调用publish发布坑位的逻辑。

// MultiProducerSequencer.java
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBuffer

private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer; // 初始全是-1
private final int indexMask;
private final int indexShift;

@Override
public void publish(final long sequence)
{
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作
}

@Override
public void publish(long lo, long hi)
{
    for (long l = lo; l <= hi; l++)
    {
        setAvailable(l);
    }
    waitStrategy.signalAllWhenBlocking();
}
/
* availableBuffer设置可用标志
* 主要原因是避免发布者线程之间共享一个序列对象。
* 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据)
*/
private void setAvailable(final long sequence)
{ // calculateIndex 求模%, calculateAvailabilityFlag 求除/
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}

private void setAvailableBufferValue(int index, int flag)
{  // 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress
    long bufferAddress = (index * SCALE) + BASE;
    // availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

private int calculateAvailabilityFlag(final long sequence)
{ // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
    return (int) (sequence >>> indexShift);
}

private int calculateIndex(final long sequence)
{ // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1
    return ((int) sequence) & indexMask;
}

对比SingleProducerSequencer的publish,MultiProducerSequencer的publish没有设置cursor,而是将内部使用的availableBuffer数组对应位置进行设置。availableBuffer是一个记录RingBuffer槽位状态的数组,通过对序列值sequence取ringBuffer大小的模,获得槽位号,再通过与ringBuffer大小相除,获取序列值所在的圈数,进行设置。这里没有直接使用模运算和触发运算,而使用更高效的位与和右移操作。
其他的操作,MultiProducerSequencer和SingleProducerSequencer类似,就不再赘述了。

4 剖析SingleProducerSequencer设计

上面已经把Disruptor的主要发布事件流程过了一遍,好奇如你,必然觉得意犹未尽。如果你没有,那肯定还是我讲的有问题,不代表Disruptor本身的精彩。
接下来说一说SingleProducerSequencer的设计。从中我们可以看到Disruptor解决伪共享问题的实际代码。
SingleProducerSequencer继承了抽象类SingleProducerSequencerFields,SingleProducerSequencerFields又继承了抽象类SingleProducerSequencerPad。其中SingleProducerSequencerFields是实际放置有效实例变量的位置。

// SingleProducerSequencer.java
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
    protected long p1, p2, p3, p4, p5, p6, p7;

    public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }
}

abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
    public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
    {
        super(bufferSize, waitStrategy);
    }

    /**
    * Set to -1 as sequence starting point
    */
    protected long nextValue = Sequence.INITIAL_VALUE; // 生产者申请的下一个序列值
    protected long cachedValue = Sequence.INITIAL_VALUE; // 缓存上一次比较的门控序列组和next的较小值(最慢消费者序列值)
}

/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
* <p>
* <p>Note on {@link Sequencer#getCursor()}:  With this sequencer the cursor value is updated after the call
* to {@link Sequencer#publish(long)} is made.
*/

public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
    protected long p1, p2, p3, p4, p5, p6, p7;
    // ...省略
}

可以发现,在两个实例变量前后各有7个long型变量。为什么这样做呢?对CPU缓存有了解的同学一定知道的……对,就是为了解决伪共享问题。
CPU在加载内存到缓存行时,一个缓存行中最多只有这两个有效变量,最大限度地避免了因伪共享问题,导致缓存失效,而造成性能损失。
为了更清晰地阐述这个道理,我们尝试看一下SingleProducerSequencer实例的内存布局。
使用HSDB(HotSpot Debugger,可通过 java -cp .;"%JAVA_HOME%/lib/sa-jdi.jar" sun.jvm.hotspot.HSDB 启动)跟踪demo对应的已断点的HotSpot进程,从Object Histogram对象图中筛选出SingleProducerSequencer实例,并通过Inspector工具对SingleProducerSequencer实例进行查看。
本例中,0x00000000828026f8为com.lmax.disruptor.SingleProducerSequencer实例在JVM中的内存起始位置。以此内存地址通过mem命令查看后续的30个内存地址内容。为啥要30个呢?其实20个就够了,可以看到"Object Histogram"中SingleProducerSequencer实例的size是160字节,mem打印一行表示一字长,对应到我本机的64位机器即8字节,所以长度选择大于等于160/8=20就可以看到SingleProducerSequencer实例的内存布局全貌。

hsdb

左侧红框中的地址0x0000000082802750和0x0000000082802758分别对应右侧红框中的nextValue和cachedValue两个实例变量。而在它们前后,各有7个连续的long型整数0。CPU在加载连续内存到缓存时,以缓存行为单位。缓存行通常为64B,通过占位,可以让实际变量独享一个缓存行。从而解决了伪共享问题。
缓存行查看:linux可使用以下命令查看。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
windows可使用CPU-Z查看。

cpu-z

附录:JAVA对象的内存布局相关知识

最后再说点Java对象的内存布局,和本文主题关系不大,可以略过。
HotSpot对象内存布局
HotSpot中一个对象(非数组)的内存布局大概是这样的:对象头(Mark Word + klass pointer) + 实际数据 + 为了保持8字节对齐的填充。其中对象头的Mark Word和klass pointer长度各为一机器字(machine-word),即32位机器对应32bit(4字节),64位机器对应64bit(8字节)。如64位JVM开启了指针压缩,klass pointer将压缩到4字节。
查看是否开启了指针压缩
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即为开启,或jinfo -flags pid 查看全部选项。
此例中返回了-XX:+UseCompressedOops,表示开启了指针压缩(jdk1.8默认开启)。此时普通类型指针将被压缩为4字节。
下面通过SingleProducerSequencer举一个实际的例子。
SingleProducerSequencer属性

image.png

使用HSDB Inspector查看实例。

image.png

查看对象内存内容

hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存储对象运行时数据,如哈希码、GC分代年龄、锁状态标志、线程持有锁、偏向线程ID、偏向时间戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38。由于开启了指针压缩,低4位表示klass pointer,由于使用的JDK1.8,klass metadata保存在Metadataspace中。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括当前行的以下7行 SingleProducerSequencerPad中定义的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定义的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000

计算此对象的Shallow Heap size 和 Retained Heap size
可以发现此对象一共占用20*8=160B内存,此值即Shallow Heap size。也可以手工计算:mark_word[8] + klass_pointer[4] + 2 * ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留内存大小Retained Heap size = Shallow Heap size + (当前对象的引用对象排除GC Root引用对象)的Shallow Heap size。
这里涉及到的引用为:cursor 0x00000000828028e0 ,waitStrategy 0x0000000082809e98 ,gatingSequences 0x000000008284b390。

分别使用revptrs命令查找反向引用,发现只有gatingSequences为此对象唯一引用,故计算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。这里由于开启了压缩指针,引用指针占用4B,此时占用20B,需要填充4B补满24B。故对象的Retained Heap size为160+24=184。

hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0

数组对象的Shallow Heap size=引用对象头大小12字节+存储数组长度的空间大小4字节+数组的长度*数组中对象的Shallow Heap size+padding大小

最后还有个问题,我们知道从Java8开始,Metaspace替代之前的PermGen存储元信息。使用Java7的HSDB是可以通过universe命令查看到PermGen信息的,而Java8就查不到Metaspace信息。

Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]

Disruptor生产者相关源码就分享到这,后续将对消费者一探究竟。


参考资料

  1. Java对象内存布局(推荐,写的很棒) http://www.jianshu.com/p/91e398d5d17c
  2. JVM——深入分析对象的内存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
  3. 借HSDB来探索HotSpot VM的运行时数据 http://rednaxelafx.iteye.com/blog/1847971
  4. markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
  5. Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
  6. AtomicLong.lazySet是如何工作的? http://ifeve.com/how-does-atomiclong-lazyset-work/
  7. 《深入理解Java虚拟机》2.3.2 对象的内存布局
上一篇下一篇

猜你喜欢

热点阅读