JDK源代码

ArrayBlockingQueue源代碼解析(base jdk

2018-03-19  本文已影响0人  冰殇之刃

前记:

上篇文章写完之后,觉得代码贴的太多了,不过源代码

解析这种的,就是看源代码才有意思。主要是还是引导多思考,以
后写读源代码文章的博客的步骤,1.通过这段代码实现的功能
会先构思一下实现方案和代码架构,和一些其他的思考。2逐行
分析每个代码行的意思和意图。3.总结,对比其他类似的实现
或者其他的一些感悟。

正文:

ArrayBlockingQueue 是数组结构的堵塞队列的一种实现,那么肯定要实现的BlockingQueue接口。

解释一下接口含义
boolean add(E e); 队列添加元素,返回成功标识,队列满了抛出队列满的异常,无堵塞。
boolean offer(E e);队列添加元素,返回成功标识,无堵塞。
void put(E e);队列添加元素,无返回值,队列满了会堵塞。
boolean offer(E e, long timeout, TimeUnit unit);队列添加元素,队列满了堵塞,设置有超时时间。
E poll();队列拉取元素,无堵塞,没有值返回null。
E take();队列拉取元素,队列空了会堵塞,等待能拉取到值为止
E poll(long timeout, TimeUnit unit);队列拉取元素,队列空了等待,设置有等待超时时间
E peek() ; 只读队首元素的值,没有返回空
int remainingCapacity(); 计算剩余容量
boolean remove(Object o); 移除元素
int drainTo(Collection<? super E> c, int maxElements); 移除元素放到入参的集合当中
public Iterator<E> iterator() jdk 1.8以后ArrayBlockingQueue还增加了迭代器功能,这个模块下面会重点介绍,很有意思。
堵塞队列提供的功能:
在多线程环境下提供一个类似于生产者和消费者这样的一个模型
提供一个FIFO的顺序读取和插入
那就引起我的思考:

怎么实现的堵塞机制和堵塞的超时机制?
作为一个集合类,数组结构的怎么在多线程环境下实现安全扩容?
1.8jdk版本为什么会增加迭代器功能?
1.元素

/** 堵塞队列中存放的对象 /
final Object[] items;
/
* 消费者获取对象的下一个对象下标,具体的操作有poll take peek remove /
int takeIndex;
/
* 生产者放入对象的下一个对象的下标,具体的操作有 put offer add /
int putIndex;
/
* 队列中元素的数量 /
int count;
/
* 这个锁就是实现生产者,消费者模型的锁模型,并且所有和并发相关的堵塞控制都是通过这个锁来实现的/
final ReentrantLock lock;
/
* 这个是有ReentrantLock 中的Condition一个标识队列中有元素非空标志,用于通知消费者队列中有数据了,快来取数据 /
private final Condition notEmpty;
/
* 这个也是ReentrantLock 中的Condition的一个标识,标识队列中的元素不满用于通知生产者队列中空地,快来塞数据/
private final Condition notFull;
/
*

*/

class Itrs {

/**
 * 将里面的元素设置成弱引用,目标就是当成缓存使用的
 * Node里面存放的其实迭代器
 */
private class Node extends WeakReference<Itr> {
    Node next;
     Node(Itr iterator, Node next) {
        super(iterator);
        this.next = next;
    }
}
/** 记录循环的次数,当take下标到0的时候为一个循环 cycle+1 */
int cycles = 0;
/** Node的前节点 */
private Node head;
/** 用于删除无用的迭代器 */
private Node sweeper = null;
/***
 * 这个标识删除探针
 */
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
/**初始化函数注册迭代器到迭代器集合里面

Itrs(Itr initial) {
    register(initial);
}
/**
 * 清理itrs 整理旧的过期的迭代器 所谓过期的迭代器,是被标识为none 或者是Detached就是被取走的

/***

/
private class Itr implements Iterator<E> {
/
* 光标,是迭代器下一次迭代时的坐标,迭代器没有需要遍历的对象了,这个值会为负值/
private int cursor;
/
* 下一个元素内容,调用Iterator.next方法拿到的值 /
private E nextItem;
/
* 下一个元素的下标,none 是-1 被移除了是-2对应下面的static int /
private int nextIndex;
/
* 上一个元素的内容 /
private E lastItem;
/
* 上一个元素的的下标,none 是-1 被移除的是-2 同样对应下面的static int /
private int lastRet;
/
* 记录之前的开始遍历的下标,当这个迭代器判定为失效了这个值就是DETACHED /
private int prevTakeIndex;
/
记录之前循环次数的值,和Cycles进行比对,就知道有没有再循环过 /
private int prevCycles;
private static final int NONE = -1;
/
元素被调用remove方法移走,的状态/
private static final int REMOVED = -2;
/** 分离分开的Special value for prevTakeIndex indicating "detached mode" /
private static final int DETACHED = -3;
/
迭代器的初始化函数从takeIndex位置开始遍历/
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//队列里面没有值
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
//队列首元素后一个
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
//注册到itrs,所有迭代器的集合,顺序注册的
itrs.register(this);
// 清理无用的迭代器
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}

方法:(只介绍复杂性,有代表性的)

/**

堵塞提交,超时返回false
/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//获取锁
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//这里是使用同步队列器的超时机制,在nanos的时间范围内,方法会在这里堵塞,超过这个时间段nanos的值会被赋值为负数,方法继续,然后在下一个循环返回false。这个标志是未满标志,队列里面未满就可以放进元素嘛。然后判断成功就是一个入队列操作
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
/

/**

/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//@key jdk1.8的新特性迭代器特性,这里是因为元素的出队列所以清理和这个元素相关联的迭代器
itrs.elementDequeued();
//对于生产者的通知
notFull.signal();
return x;
}
/
*

/**

回顾一下;

我介绍了ArrayblockingQueue其实是包含了两个部分一个是标准阻塞队列接口的实现。另一个是jdk1.8增加的迭代器。上一个满大街博客都能找的到,我就把接口描述了一下,然后介绍了两个还算是复杂一点的接口。和整个一个工作原理,没有太多使用case。主要是就是生产者和消费者模型。一个锁应用,和其他的JUC框架不一样。它什么操作都加锁,并发变串行。所以它就没有用到原子类修饰的共享变量。
关于迭代器部分好像是只有我这里有写。如果有百度上有看到相关ArrayBlockingQueue迭代器文章的请留言。毕竟我一家之言,还是有可能会有理解上的偏差。我们总结一下这个迭代器。首先跟别的设计一样,谁用谁new。这个不一样的是会增加一个注册到堵塞队列对象里面itrs上面。然后呢用了一个软引用,那么就GC可以回收避免内存溢出。然后会有对无用的迭代器的清理,类似于threadLocal那样。那么什么是无用的迭代器呢。标识无用就一个条件,我的迭代器标识的结点被覆盖了,因为它空间就这么大,举个例子一个大小5的堵塞队列。然后我建了一个迭代器,那么这个迭代器的下标就是0.然后迭代器我没有马上用,然后进出队列10次,那么之前节点的值已经被替换了。队列里面还有值,但是迭代器的值已经在take方法中被干掉了,已经失效了。判断条件就是cycle的循环次数。有兴趣可以好好了解一下,这应该是我看过的最复杂的迭代器了。
留一些问题:

1.这个迭代器为什么会比arrayList复杂这么多?

2.其实作为堵塞队列来说无非就是数据交换,拿有什么场景是需要迭代器的?而且本身就全都锁控制,效率就不高。还加入这么复杂的迭代模块。会更慢一些的?

这篇文章会看起来比较碎。尽力了。。没有整块的时间去写。而且没想这个迭代器这么复杂。花费我很多时间去研究(没错,这就是我脱稿的原因)

还有就是风格和上一篇不一样了。我希望可以让看这篇文章的人不光是可以学习到之前不知道的知识。也可以触发大家更多的去主动的思考,去思考模块的设计,功能的实现。而不是被动接受这篇文章所传递出来的内容。

还有就是看这种源码。一定要先框架,功能。摸透再去看细节。如果你对这个代码块所要完成的功能不够了解。拿看起来费劲。框架,功能这些都摸透了。再钻到细节上面去。我们可能用到的框架很多,拿要读的源代码那就太多了。其实阅读源代码我觉得是培养一个阅读代码的能力。一个是学习处理这种场景的解决方案,一个是学习编程风格,编码模式。还有就是可能会培养对编程、对探究的兴趣。毕竟工作不能只是为了赚钱。
上一篇 下一篇

猜你喜欢

热点阅读