【BlockingQueue】ArrayBlockingQueu
ArrayBlockingQueue
基于数组实现的有界队列,put()和take()方法为阻塞方法,内部使用ReentryLock方法实现
常用方法:
add():
内部调用了offer()方法,如果队列满,则Queue full异常
offer():
如果队列满则返回false,不继续添加
put():
内部用Condition实现,如果队列满,则notFullCondition.await()等待唤醒
take():
队列中为空则阻塞,删除队列头部元素并返回
poll():
非阻塞,队列为空则返回null,否则删除头部队列并返回元素
peek():
返回队列头部元素,并不删除
LinkedBlockingQueue
1.内部有两把锁,putLock+takeLock,提高的吞吐率
基于链表实现,默认capacity=Integer.MAX_VAlUE;
常用方法:
offer():
1.如果队列size>=capacity,则直接返回false
2.队列不满则直接将对象封装成node入队,然后再判断队列是否满,不满的唤醒添加线程,notFull.singal()
3.当c(=count. getAndIncrement())==0时,说明有消费线程等待,需要唤醒notEmtpy.singal(),需要对takeLock进行加锁
public boolean offer(E e) {
//添加元素为null直接抛出异常
if (e == null) throw new NullPointerException();
//获取队列的个数
final AtomicInteger count = this.count;
//判断队列是否已满
if (count.get() == capacity)
return false;
int c = -1;
//构建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//再次判断队列是否已满,考虑并发情况
if (count.get() < capacity) {
enqueue(node);//添加元素
c = count.getAndIncrement();//拿到当前未添加新元素时的队列长度
//如果容量还没满
if (c + 1 < capacity)
notFull.signal();//唤醒下一个添加线程,执行添加操作
}
} finally {
putLock.unlock();
}
// 由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等到线程,因此count肯定会变化。
//这里的if条件表示如果队列中还有1条数据
if (c == 0)
signalNotEmpty();//如果还存在数据那么就唤醒消费锁
return c >= 0; // 添加成功返回true,否则返回false
}
//入队操作
private void enqueue(Node<E> node) {
//队列尾节点指向新的node节点
last = last.next = node;
}
//signalNotEmpty方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
//唤醒获取并删除元素的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
add():
add方法内部调用了offer()方法,如果队列满,则报"queue full"异常
put():
1.获取putLock锁,如果队列已满,则等待,否则入队
2.再次判断队列大小,如果没满,则唤醒添加线程notFull.singal()
3.判断c(=count.getAndIncrement())是否==0,如果是,表明可能有消费线程等待,需要唤醒notEmpty.singal(),需要获取takeLock锁
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
取数据方法:
poll():
1.如果没有数据,则返回null,如果有则返回数据
2.如果队列的大小c大于1,则唤醒notEmpty消费线程
3.如果c==capacity,表示notFull上等待有添加线程,需要唤醒,这点与前面分析if(c==0)是一样的道理。因为只有可能队列满了,notFull条件对象上才可能存在等待的添加线程
take()方法:
1.take是可阻塞、可中断的移除方法,如果队列为空,则notEmpty.await()
2.如果队列不为空(c>1),则唤醒notEmpty.singal(),唤醒其他等待的消费线程
3.if(c==capacit)表明可能有添加线程等待在notFull上,需要唤醒notFull.singal()【需要对putLock加锁】
remove()方法:
1.需要对putLock和takeLock同时加锁
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断
try {
//如果队列没有数据,挂机当前线程到条件对象的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在数据直接删除并返回该数据
x = dequeue();
c = count.getAndDecrement();//队列大小减1
if (c > 1)
notEmpty.signal();//还有数据就唤醒后续的消费线程
} finally {
takeLock.unlock();
}
//满足条件,唤醒条件对象上等待队列中的添加线程
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
//获取当前队列的大小
final AtomicInteger count = this.count;
if (count.get() == 0)//如果没有元素直接返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//判断队列是否有数据
if (count.get() > 0) {
//如果有,直接删除并获取该元素值
x = dequeue();
//当前队列大小减一
c = count.getAndDecrement();
//如果队列未空,继续唤醒等待在条件对象notEmpty上的消费线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//判断c是否等于capacity,这是因为如果满说明NotFull条件对象上
//可能存在等待的添加线程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;//获取头结点
Node<E> first = h.next; 获取头结的下一个节点(要删除的节点)
h.next = h; // help GC//自己next指向自己,即被删除
head = first;//更新头结点
E x = first.item;//获取删除节点的值
first.item = null;//清空数据,因为first变成头结点是不能带数据的,这样也就删除队列的带数据的第一个节点
return x;
}
LinkedBlockingQueue和ArrayBlockingQueue迥异
1.array和linked两种队列大小不一样,array在构建时必须指定队列大小,而linked可以指定队列大小,如果没有指定,默认为integer.max_value
2.代码实现不同,array内部使用了一把ReentryLock,linked采用锁分离,使用了两把锁,putLock和takeLock,在高并发场景下,可以并行的对队列进行操作,提高了吞吐率
3.linked当添加线程快于消费线程时,会造成内存溢出等问题
4.由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
【参考博客】
https://blog.csdn.net/javazejian/article/details/77410889?locationNum=1&fps=1