音视频开发之旅(55)-阻塞队列与无锁并发容器
目录
- 阻塞队列的定义和使用场景
- 阻塞的队列的实现原理
- 简单学习无锁并发容器之ConcurrentLinkedQueue和CAS
- 资料
- 收获
一、阻塞队列的定义和使用场景
阻塞队列(BlockingQueue)在队列Queue的基础上增加了两个场景的阻塞
- 当队列满时,再向队列添加数据会阻塞,直到队列不满时
- 当队列为空时,再向队列获取数据会阻塞,直到队列变为非空
阻塞队列常用于生产者消费者的场景
下面我们先来Queue和BolckingQueue接口的定义
//java.util.Queue
public interface Queue<E> extends Collection<E> {
//添加一个元素到队列,如果队列满时会抛出异常IllegalStateException
boolean add(E e);
//添加一个元素到队列,如果队列满时不会抛异常,而是返回false
boolean offer(E e);
//从队列中获取并移除一个元素,如果队列为空, 会抛出NoSuchElementException
E remove();
//从队列中获取并移除一个元素,如果队列为空, 不会抛异常,而是返回null
E poll();
//从队列中获取一个元素 但不移除。注意和remove的区别
//当队列为空时,会抛出异常NoSuchElementException
E element();
//从队列中获取一个元素,也不移除。注意和poll的区别
//当队列为空时,不会抛出异常,而是返回null
E peek();
}
//java.util.concurrent.BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
//插入一个元素到队列,如果队列满了,等待直到有空间空用
void put(E e) throws InterruptedException;
//插入一个元素到队列,如果队列满了,等待一定时间返回,或者有空间空用
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列的头元素,如果队列为空,则等待
E take() throws InterruptedException;
//从队列中获取并移除一个元素,如果队列为空,等待一段时间
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
我们可以看到BlockingQueue
继承自Queue
并且新增了几个阻塞的方法。
Java中BlockingQueue
接口有七个实现类,分别如下:
- ArrayBlockingQueue : 由数组结构组成的有界阻塞队列,在添加和获取时内部使用一个ReentrantLock可重入同步锁
- LinkedBlockingQueue:由链表结构组成的有界阻塞队列。在添加和获取时内部使用两个ReentrantLock,吞吐量高于ArrayBlockingQueue,Executors#newSingleThreadExecutor()和Executors#newFixedThreadPool(int)都使用了这个阻塞队列
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- SynchronousQueue:不存储元素的阻塞队列。每个插入操作必须等待另个一个线程调用的移除操作,否则一致处于阻塞状态。吞吐量一般高于LinkedBlockingQueue。Executors#newCachedThreadPool()使用了这个阻塞队列
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的支持延迟获取元素的无界阻塞队列
- TransferQueue:链表结构组成的无界阻塞队列
- BlockingDeque:链表结构组成的双向阻塞队列
二、阻塞的队列的实现原理(LinkedBlockingQueue)
我们以LinkedBlockingQueue来分析
//节点结构体
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 从队列获取元素时的可重入锁 ,非公平锁*/
private final ReentrantLock takeLock = new ReentrantLock();
/** 非空condition,等待队列非空*/
private final Condition notEmpty = takeLock.newCondition();
/** 向队列中插入元素时的可重入锁 ,非公平锁*/
private final ReentrantLock putLock = new ReentrantLock();
/** 非满condition,等待队列非满 */
private final Condition notFull = putLock.newCondition();
/**
* 当队列有元素后,发出非空信号
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 当队列由满到不满后,发出该非满信号
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
2.1 插入元素到队列
offer的实现 (添加一个元素到队列,如果队列满时不会抛异常,而是返回false)
public boolean offer(E e) {
...
int c = -1;
Node<E> node = new Node<E>(e);
//获取写 可重入锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//如果队列还未满,插入该元素节点
if (count.get() < capacity) {
// enqueue 插入元素到队列,一会我们在看下其实现
enqueue(node);
c = count.getAndIncrement();
//如果插入后,还队列还未满,发送未满信号
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果成功插入后,发送非空信号
if (c == 0)
signalNotEmpty();
return c >= 0;
}
put的实现 (插入一个元素到队列,如果队列满了,等待直到有空间空用)
public void put(E e) throws InterruptedException {
...
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//相比较offer这是差异点1
//采用了可中断锁,等待过程中可以接收中断
putLock.lockInterruptibly();
try {
//相比较offer这是差异点2,
//如果当前队列满了,则阻塞,等待非空的信号到来
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
enqueue的实现
private void enqueue(Node<E> node) {
//把当前节点作为队列先前未节点的next插入到队列中
//然后吧last指向新插入的节点
last = last.next = node;
}
2.2 从队列获取元素
poll的实现(从队列中获取并移除一个元素,如果队列为空, 不会抛异常,而是返回null)
public E poll() {
...
int c = -1;
//获取取 可重入锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//如果当前队列的元素个数大于0
if (count.get() > 0) {
//dequeue 从队列中获取一个元素,稍后再分析
x = dequeue();
//取出后如果队列中元素的个数还大于1
//(为什么不是大于0?
// 这是因为getAndDecrement的实现是先获取再减1),
// 则发出非空信号
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果c的值等于容器的值(由于getAndDecrement的实现是先获取再减1,这是队列从满变为了非满状态),则发出非满信号
if (c == capacity)
signalNotFull();
return x;
}
take的实现 (获取队列的头元素,如果队列为空,则等待)
public E take() throws InterruptedException {
...
int c = -1;
final ReentrantLock takeLock = this.takeLock;
//和poll的差异点1:wait时支持中断
takeLock.lockInterruptibly();
try {
//和poll的差异点2:如果队列为空,则阻塞等待,知道收到非空的信号
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue的实现
private E dequeue() {
//链表操作的通用做法,head是一个虚节点
Node<E> h = head;
//头节点的next赋值给定义的first节点
Node<E> first = h.next;
//把先前的头节点头的next指向自身节点,方便gc
h.next = h; // help GC
//标记新的头节点给到head指针
head = first;
//获取元素
E x = first.item;
first.item = null;
return x;
}
为了方便dequeue的理解,画下列表的节点图如下
我们看先LinkedBlockingQueue再线程池中的使用,前面已经提到了,Executors#newSingleThreadExecutor()和Executors#newFixedThreadPool(int)都使用了LinkedBlockingQueue,我们通过下面两张来自《java并发编程的艺术》的示意图来看下
其他阻塞队列的实现可以自行分析下,比如ArrayBlockingQueue和SynchronousQueue的实现。
三、简单学习无锁并发容器之ConcurrentLinkedQueue和CAS
上面介绍的LinkedBlockingQueue通过加锁阻塞的方式保证线程安全性。还有一种非阻塞的算法实现。ConcurrentLinkedQueue就是通过后者实现的,我们一起来分析学习下。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
static <E> Node<E> newNode(E item) {
Node<E> node = new Node<E>();
//这里的U是sun.misc.Unsafe
U.putObject(node, ITEM, item);
return node;
}
static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) {
return U.compareAndSwapObject(node, NEXT, cmp, val);
}
public boolean offer(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (casNext(p, null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
Unsafe类
Unsafe类中存在直接操作内存的方法 ,Java中CAS操作的执行依赖于Unsafe类的方法,注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务
CAS为什么能保证原子性?
无锁策略则采用一种称为CAS的技术来保证线程执行的安全性
CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下
CAS(V,E,N)
其包含3个参数
V表示要更新的变量
E表示预期值
N表示新值
//如果V值等于E值,则将V的值设为N。若V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做
假设存在多个线程执行CAS操作并且CAS的步骤很多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?
答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
Unsafe这块源码解析和理解还是有些不足,根据需要再去看吧,Java并发系列到这里就暂时告一段落。
接下来进入编解码的学习时间,准备建立学习和写作打卡群,有兴趣的欢迎加我微信“yabin_yangO2O”,备注 视频编码读书写作
,一起学习成长。
四、资料
- 图书:《Java并发编程的艺术》
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- Java并发编程-无锁CAS与Unsafe类及其并发包Atomic
五、收获
通过本篇的学习实践
- 分析了java并发阻塞队列的应用和实现
- 简单分析学习了CAS和无锁并发容器ConcurrentLinkedQueue
感谢你的阅读,Java并发编程到这里就暂告一段落,接下来一段时间会进入编码的学习时间。
主要是针对《视频编码全角度详解》这本书的阅读和实践。以21天为一个周期(不一定要读完,但是每天至少读一页,且至少输出50字),有兴趣的朋友可以一起来学习交流,加我微信“yabin_yangO2O”,备注 视频编码读书写作
下一篇我们开始视频编码知识的学习实践,欢迎关注公众号“音视频开发之旅”,一起学习成长。
欢迎交流