解读Disruptor系列--解读源码(2)之生产者
之前我们一起分析了Disruptor的初始化和启动代码,接下来我们来分析下生产者的发布代码。还不太了解的同学建议看看我之前发的Disruptor原理翻译和导读文章,尤其是一些名词概念最好要清楚是做什么用的。
1 生产者线程
生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作(注:translate经常是“翻译”的意思,但其实还有“ move from one place or condition to another.”的转移、转换的意思)。
根据同一事件传入参数的多少,可以选择不同接口接收参数。
/**
* 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
@Override
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
}
}
// 生产线程0
Thread produceThread0 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){ // 除了下面这行代码,其他都没有关系
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
// 生产线程1
Thread produceThread1 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
produceThread0.start();
produceThread1.start();
在demo中,我们实例化并启动了两个线程,用来生产事件放置到Disruptor中。
接下来我们跟随源码一点点深入。
2 生产事件的整体逻辑
// Disruptor.java
/**
* Publish an event to the ring buffer. 使用给定的事件翻译器,发布事件
*
* @param eventTranslator the translator that will load data into the event.
* @param arg A single argument to load into the event
*/
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
{
ringBuffer.publishEvent(eventTranslator, arg);
}
之前也讲过,Disruptor这个类是一个辅助类,在发布事件时其实是委托给RingBuffer完成发布操作。
RingBuffer.publishEvent()的逻辑大概分为两个步骤:第一步先占有RingBuffer上的一个可用位置,我们简称为“占坑”;第二步在可用位置发布数据,我们简称为“填坑”。
// RingBuffer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
final long sequence = sequencer.next(); // 第一步 占坑
translateAndPublish(translator, sequence, arg0); // 第二步 填坑
}
其中第二步中,在填坑完毕还要调用Sequencer接口的publish方法对外发布事件。为啥呢?先留个疑问。
在第一步占坑中,首先通过调用Sequencer.next()获取RingBuffer实例下一个能用的序号。
AbstractSequencer作为一个抽象类,实现了Sequencer接口,是单生产者Sequencer和多生产者Sequencer的父类。
3 Disruptor的核心--Sequencer接口
为什么说Sequencer是Disruptor的核心呢?其实这也不是我说的,是Disruptor官方Wiki Introduction上说的:
Sequencer是用来保证生产者和消费者之间正确、高速传递数据的。我们先来看看以生产者的角度看Sequencer有什么作用。
先来张类图。
Sequencer类图
下边是Sequencer接口及其父接口Cursored、Sequenced 定义。
// Sequencer
/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
public interface Sequencer extends Cursored, Sequenced
{
/**
* Set to -1 as sequence starting point
* 序号开始位置
*/
long INITIAL_CURSOR_VALUE = -1L;
/**
* Claim a specific sequence. Only used if initialising the ring buffer to
* a specific value.
*
* @param sequence The sequence to initialise too.
* 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了
*/
void claim(long sequence);
/**
* Confirms if a sequence is published and the event is available for use; non-blocking.
*
* @param sequence of the buffer to check
* @return true if the sequence is available for use, false if not
* 用非阻塞方式,确认某个序号是否已经发布且事件可用。
*/
boolean isAvailable(long sequence);
/**
* Add the specified gating sequences to this instance of the Disruptor. They will
* safely and atomically added to the list of gating sequences.
*
* @param gatingSequences The sequences to add.
* 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者
*/
void addGatingSequences(Sequence... gatingSequences);
/**
* Remove the specified sequence from this sequencer.
*
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
* 从门控序列中移除指定序列
*/
boolean removeGatingSequence(Sequence sequence);
/**
* Create a new SequenceBarrier to be used by an EventProcessor to track which messages
* are available to be read from the ring buffer given a list of sequences to track.
*
* @param sequencesToTrack
* @return A sequence barrier that will track the specified sequences.
* @see SequenceBarrier
* 消费者使用,用于追踪指定序列(通常是上一组消费者的序列)
*/
SequenceBarrier newBarrier(Sequence... sequencesToTrack);
/**
* Get the minimum sequence value from all of the gating sequences
* added to this ringBuffer.
*
* @return The minimum gating sequence or the cursor sequence if
* no sequences have been added.
* 获取追踪序列中最小的序列
*/
long getMinimumSequence();
/**
* Get the highest sequence number that can be safely read from the ring buffer. Depending
* on the implementation of the Sequencer this call may need to scan a number of values
* in the Sequencer. The scan will range from nextSequence to availableSequence. If
* there are no available values <code>>= nextSequence</code> the return value will be
* <code>nextSequence - 1</code>. To work correctly a consumer should pass a value that
* is 1 higher than the last sequence that was successfully processed.
*
* @param nextSequence The sequence to start scanning from.
* @param availableSequence The sequence to scan to.
* @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
*
* 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence
* 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者
* 应该传递一个比最后成功处理的序列值大1的值。
*/
long getHighestPublishedSequence(long nextSequence, long availableSequence);
<T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}
// Cursored.java
/**
* Implementors of this interface must provide a single long value
* that represents their current cursor value. Used during dynamic
* add/remove of Sequences from a
* {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
* 游标接口,用于获取生产者当前游标位置
*/
public interface Cursored
{
/**
* Get the current cursor value.
*
* @return current cursor value
*/
long getCursor();
}
// Sequenced.java
public interface Sequenced
{
/**
* The capacity of the data structure to hold entries.
*
* @return the size of the RingBuffer.
* 获取环形缓冲的大小
*/
int getBufferSize();
/**
* Has the buffer got capacity to allocate another sequence. This is a concurrent
* method so the response should only be taken as an indication of available capacity.
*
* @param requiredCapacity in the buffer
* @return true if the buffer has the capacity to allocate the next sequence otherwise false.
* 判断是否含有指定的可用容量
*/
boolean hasAvailableCapacity(final int requiredCapacity);
/**
* Get the remaining capacity for this sequencer.
*
* @return The number of slots remaining.
* 剩余容量
*/
long remainingCapacity();
/**
* Claim the next event in sequence for publishing.
*
* @return the claimed sequence value
* 生产者发布时,申请下一个序号
*/
long next();
/**
* Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
* requires a little care and some math.
* <pre>
* int n = 10;
* long hi = sequencer.next(n);
* long lo = hi - (n - 1);
* for (long sequence = lo; sequence <= hi; sequence++) {
* // Do work.
* }
* sequencer.publish(lo, hi);
* </pre>
*
* @param n the number of sequences to claim
* @return the highest claimed sequence value
* 申请n个序号,用于批量发布
*/
long next(int n);
/**
* Attempt to claim the next event in sequence for publishing. Will return the
* number of the slot if there is at least <code>requiredCapacity</code> slots
* available.
*
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next()的非阻塞模式
*/
long tryNext() throws InsufficientCapacityException;
/**
* Attempt to claim the next n events in sequence for publishing. Will return the
* highest numbered slot if there is at least <code>requiredCapacity</code> slots
* available. Have a look at {@link Sequencer#next()} for a description on how to
* use this method.
*
* @param n the number of sequences to claim
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next(n)的非阻塞模式
*/
long tryNext(int n) throws InsufficientCapacityException;
/**
* Publishes a sequence. Call when the event has been filled.
*
* @param sequence
* 数据填充后,发布此序号
*/
void publish(long sequence);
/**
* Batch publish sequences. Called when all of the events have been filled.
*
* @param lo first sequence number to publish
* @param hi last sequence number to publish
* 批量发布序号
*/
void publish(long lo, long hi);
}
3.1 单生产者发布事件
下边先看使用单生产者SingleProducerSequencer具体是怎么占坑的。
// SingleProducerSequencer.java
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// 复制上次申请完毕的序列值
long nextValue = this.nextValue;
// 加n,得到本次需要申请的序列值,单个发送n为1
long nextSequence = nextValue + n; // 本次要验证的值
// 可能发生绕环的点,本次申请值 - 一圈长度
long wrapPoint = nextSequence - bufferSize; // 400米跑到,小明跑了599米,小红跑了200米。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。
long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者
// wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。
// 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧?
// 没有空坑位,将进入循环等待。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 只有当消费者消费,向前移动后,才能跳出循环
// 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 环形等待的消费者
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
// 当消费者向前消费后,更新缓存的最小序号
this.cachedValue = minSequence;
}
// 将成功申请的序号赋值给对象实例变量
this.nextValue = nextSequence;
return nextSequence;
}
next()占坑成功将会返回坑位号,回到RingBuffer的publishEvent方法,执行translateAndPublish方法,进行填坑和发布操作。
// RingBuffer.java
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
try
{
translator.translateTo(get(sequence), sequence);
}
finally
{
sequencer.publish(sequence);
}
}
translator参数用户定义的对EventTranslator接口的实现对象。
上文已经介绍过EventTranslator接口,除EventTranslator外,还有EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg。功能是将给定的数据填充到指定坑位的对象(因为RingBuffer上已经预先分配了对象)上,只不过分别对应不同参数。简单看下EventTranslatorOneArg接口定义。
public interface EventTranslatorOneArg<T, A>
{
/**
* Translate a data representation into fields set in given event
*
* @param event into which the data should be translated.
* @param sequence that is assigned to event.
* @param arg0 The first user specified argument to the translator
*/
void translateTo(final T event, long sequence, final A arg0);
}
在放好数据后,就可以调用sequencer的publish方法发布对象了。首先是更新当前游标,更新完毕再通知等待中的消费者,消费者将继续消费。关于消费者的等待策略,后续还会讲到。
// SingleProducerSequencer.java
@Override
public void publish(long sequence)
{ // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
cursor.set(sequence);
// 除signalAllWhenBlocking外都是空实现
waitStrategy.signalAllWhenBlocking();
}
// BlockingWaitStrategy.java
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
3.2 插播Disruptor中的高效AtomicLong--Sequence
注意那个cursor,这个cursor可不是简单的long类型,而是Disruptor内部实现的Sequence类。
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{ // value的前后各有7个long变量,用于缓存行填充,前后各7个保证了不管怎样,当64位的缓存行加载时value,不会有其他变量共享缓存行,从而解决了伪共享问题
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
* Sequence可以按照AtomicLong来理解,除了Sequence消除了伪共享问题,更加高效
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
/**
* Create a sequence initialised to -1.
*/
public Sequence()
{
this(INITIAL_VALUE);
}
/**
* Create a sequence with a specified initial value.
*
* @param initialValue The initial value for this sequence.
*/
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
/**
* Perform a volatile read of this sequence's value.
*
* @return The current value of the sequence.
*/
public long get()
{
return value;
}
/**
* Perform an ordered write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* store.
*
* @param value The new value for the sequence.
* 此方法等同于AtomicLong#lazySet(long newValue),
* 和直接修改volatile修饰的value相比,非阻塞,更高效,但更新的值会稍迟一点看到
*/
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* Performs a volatile write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* write and a Store/Load barrier between this write and any
* subsequent volatile read.
*
* @param value The new value for the sequence.
*/
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
* Perform a compare and set operation on the sequence.
*
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
/**
* Atomically increment the sequence by one.
*
* @return The value after the increment
*/
public long incrementAndGet()
{
return addAndGet(1L);
}
/**
* Atomically add the supplied value.
*
* @param increment The value to add to the sequence.
* @return The value after the increment.
*/
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}
这个Sequence其实相当于AtomicLong,最大的区别在于Sequence解决了伪共享问题。另外Sequence#set相当于AtomicLong#lazySet。
致此,使用单生产者发布事件的流程就完成了。
3.3 多生产者发布事件
如果使用的是多生产者,占坑则调用MultiProducerSequencer.next()。
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get(); // 当前游标值,初始化时是-1
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
可以发现,多生产者模式占坑和放置数据的逻辑和单生产者模式区别不大。区别主要是最后调用publish发布坑位的逻辑。
// MultiProducerSequencer.java
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBuffer
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer; // 初始全是-1
private final int indexMask;
private final int indexShift;
@Override
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作
}
@Override
public void publish(long lo, long hi)
{
for (long l = lo; l <= hi; l++)
{
setAvailable(l);
}
waitStrategy.signalAllWhenBlocking();
}
/
* availableBuffer设置可用标志
* 主要原因是避免发布者线程之间共享一个序列对象。
* 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据)
*/
private void setAvailable(final long sequence)
{ // calculateIndex 求模%, calculateAvailabilityFlag 求除/
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{ // 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress
long bufferAddress = (index * SCALE) + BASE;
// availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
private int calculateAvailabilityFlag(final long sequence)
{ // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence)
{ // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1
return ((int) sequence) & indexMask;
}
对比SingleProducerSequencer的publish,MultiProducerSequencer的publish没有设置cursor,而是将内部使用的availableBuffer数组对应位置进行设置。availableBuffer是一个记录RingBuffer槽位状态的数组,通过对序列值sequence取ringBuffer大小的模,获得槽位号,再通过与ringBuffer大小相除,获取序列值所在的圈数,进行设置。这里没有直接使用模运算和触发运算,而使用更高效的位与和右移操作。
其他的操作,MultiProducerSequencer和SingleProducerSequencer类似,就不再赘述了。
4 剖析SingleProducerSequencer设计
上面已经把Disruptor的主要发布事件流程过了一遍,好奇如你,必然觉得意犹未尽。如果你没有,那肯定还是我讲的有问题,不代表Disruptor本身的精彩。
接下来说一说SingleProducerSequencer的设计。从中我们可以看到Disruptor解决伪共享问题的实际代码。
SingleProducerSequencer继承了抽象类SingleProducerSequencerFields,SingleProducerSequencerFields又继承了抽象类SingleProducerSequencerPad。其中SingleProducerSequencerFields是实际放置有效实例变量的位置。
// SingleProducerSequencer.java
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
/**
* Set to -1 as sequence starting point
*/
protected long nextValue = Sequence.INITIAL_VALUE; // 生产者申请的下一个序列值
protected long cachedValue = Sequence.INITIAL_VALUE; // 缓存上一次比较的门控序列组和next的较小值(最慢消费者序列值)
}
/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
* <p>
* <p>Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
* to {@link Sequencer#publish(long)} is made.
*/
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
// ...省略
}
可以发现,在两个实例变量前后各有7个long型变量。为什么这样做呢?对CPU缓存有了解的同学一定知道的……对,就是为了解决伪共享问题。
CPU在加载内存到缓存行时,一个缓存行中最多只有这两个有效变量,最大限度地避免了因伪共享问题,导致缓存失效,而造成性能损失。
为了更清晰地阐述这个道理,我们尝试看一下SingleProducerSequencer实例的内存布局。
使用HSDB(HotSpot Debugger,可通过 java -cp .;"%JAVA_HOME%/lib/sa-jdi.jar" sun.jvm.hotspot.HSDB 启动)跟踪demo对应的已断点的HotSpot进程,从Object Histogram对象图中筛选出SingleProducerSequencer实例,并通过Inspector工具对SingleProducerSequencer实例进行查看。
本例中,0x00000000828026f8为com.lmax.disruptor.SingleProducerSequencer实例在JVM中的内存起始位置。以此内存地址通过mem命令查看后续的30个内存地址内容。为啥要30个呢?其实20个就够了,可以看到"Object Histogram"中SingleProducerSequencer实例的size是160字节,mem打印一行表示一字长,对应到我本机的64位机器即8字节,所以长度选择大于等于160/8=20就可以看到SingleProducerSequencer实例的内存布局全貌。
左侧红框中的地址0x0000000082802750和0x0000000082802758分别对应右侧红框中的nextValue和cachedValue两个实例变量。而在它们前后,各有7个连续的long型整数0。CPU在加载连续内存到缓存时,以缓存行为单位。缓存行通常为64B,通过占位,可以让实际变量独享一个缓存行。从而解决了伪共享问题。
缓存行查看:linux可使用以下命令查看。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
windows可使用CPU-Z查看。
附录:JAVA对象的内存布局相关知识
最后再说点Java对象的内存布局,和本文主题关系不大,可以略过。
HotSpot对象内存布局:
HotSpot中一个对象(非数组)的内存布局大概是这样的:对象头(Mark Word + klass pointer) + 实际数据 + 为了保持8字节对齐的填充。其中对象头的Mark Word和klass pointer长度各为一机器字(machine-word),即32位机器对应32bit(4字节),64位机器对应64bit(8字节)。如64位JVM开启了指针压缩,klass pointer将压缩到4字节。
查看是否开启了指针压缩:
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即为开启,或jinfo -flags pid 查看全部选项。
此例中返回了-XX:+UseCompressedOops,表示开启了指针压缩(jdk1.8默认开启)。此时普通类型指针将被压缩为4字节。
下面通过SingleProducerSequencer举一个实际的例子。
SingleProducerSequencer属性
使用HSDB Inspector查看实例。
image.png查看对象内存内容:
hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存储对象运行时数据,如哈希码、GC分代年龄、锁状态标志、线程持有锁、偏向线程ID、偏向时间戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38。由于开启了指针压缩,低4位表示klass pointer,由于使用的JDK1.8,klass metadata保存在Metadataspace中。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括当前行的以下7行 SingleProducerSequencerPad中定义的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定义的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000
计算此对象的Shallow Heap size 和 Retained Heap size:
可以发现此对象一共占用20*8=160B内存,此值即Shallow Heap size。也可以手工计算:mark_word[8] + klass_pointer[4] + 2 * ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留内存大小Retained Heap size = Shallow Heap size + (当前对象的引用对象排除GC Root引用对象)的Shallow Heap size。
这里涉及到的引用为:cursor 0x00000000828028e0 ,waitStrategy 0x0000000082809e98 ,gatingSequences 0x000000008284b390。
分别使用revptrs命令查找反向引用,发现只有gatingSequences为此对象唯一引用,故计算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。这里由于开启了压缩指针,引用指针占用4B,此时占用20B,需要填充4B补满24B。故对象的Retained Heap size为160+24=184。
hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0
数组对象的Shallow Heap size=引用对象头大小12字节+存储数组长度的空间大小4字节+数组的长度*数组中对象的Shallow Heap size+padding大小
最后还有个问题,我们知道从Java8开始,Metaspace替代之前的PermGen存储元信息。使用Java7的HSDB是可以通过universe命令查看到PermGen信息的,而Java8就查不到Metaspace信息。
Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]
Disruptor生产者相关源码就分享到这,后续将对消费者一探究竟。
参考资料:
- Java对象内存布局(推荐,写的很棒) http://www.jianshu.com/p/91e398d5d17c
- JVM——深入分析对象的内存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
- 借HSDB来探索HotSpot VM的运行时数据 http://rednaxelafx.iteye.com/blog/1847971
- markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
- Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
- AtomicLong.lazySet是如何工作的? http://ifeve.com/how-does-atomiclong-lazyset-work/
- 《深入理解Java虚拟机》2.3.2 对象的内存布局