我所理解的Android编程思想Android开发经验谈数据结构和算法分析

Java并发(一)——线程安全的容器(上)

2018-02-22  本文已影响76人  yhthu

Java中线程安全的容器主要包括两类:

依笔者看,早期使用的同步容器主要有两方面的问题:1)通过对方法添加synchronized关键字实现同步,这种粗粒度的加锁操作在synchronized关键字本身未充分优化之前,效率偏低;2)同步容器虽然是线程安全的,但在某些外部复合操作(例:若没有则添加)时,依然需要客户端加锁保证数据安全。因此,从Java 5.0以后,并发编程偏向于使用java.util.concurrent包(作者:Doug Lea)中的容器类,本文也将着重介绍该包中的容器类,主要包括:

  1. 阻塞队列
  2. ConcurrentHashMap
  3. 写入时复制容器

一、阻塞队列

在并发环境下,阻塞队列是常用的数据结构,它能确保数据高效安全的传输,为快速搭建高质量的多线程应用带来极大的便利,比如MQ的原理就是基于阻塞队列的。java.util.concurrent中包含丰富的队列实现,它们之间的关系如下图所示:

并发队列关系图

下面对这些队列进行详细的介绍:

1.1 BlockingQueue与BlockingDeque

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

BlockingDeque在BlockingQueue的基础上,增加了支持双向队列的属性。如下图所示,相比于BlockingQueue的插入和移除方法,变为XxxFirstXxxLast方法,分别对应队列的两端,既可以在头部添加或移除,也可以在尾部添加或移除。

BlockingDeque插入/移除方法

1.2 LinkedBlockingQueue与LinkedBlockingDeque

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE,按照先进先出的原则对元素进行排序。

首先看下LinkedBlockingQueue中核心的域:

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

private final int capacity;
private final AtomicInteger count = new AtomicInteger();

transient Node<E> head;
private transient Node<E> last;

private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

其次,LinkedBlockingQueue有三个构造方法,分别如下:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

默认构造函数直接调用LinkedBlockingQueue(int capacity)LinkedBlockingQueue(int capacity)会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化队列的同时,将一个集合的全部元素加入队列。

最后分析下puttake的过程,这里重点关注:LinkedBlockingQueue如何实现添加/移除并行的?

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

之所以把puttake放在一起,是因为它们是一对互逆的过程:

private void enqueue(Node<E> node) {
    last = last.next = node;
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

回到刚才的问题:LinkedBlockingQueue如何实现添加/移除并行的?
LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理

同样的,LinkedBlockingDequeLinkedBlockingQueue的基础上,增加了双向操作的属性。继续以puttake为例,LinkedBlockingDeque增加了putFirst/putLasttakeFirst/takeLast方法,分别用于在队列头、尾进行添加和删除。与LinkedBlockingQueue不同的是,LinkedBlockingDeque的入队列和出队列不再使用不同的Lock。

final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

其中,lock表示读写的主锁,notEmpty和notFull依然表示相应的控制线程状态条件量。以putFirsttakeFirst为例:

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

putFirst不支持插入null元素,首先新建一个Node对象,然后调用ReentrantLocklock方法获取锁,插入操作通过boolean linkFirst(Node<E> node)实现,如果当前队列头已满,那么该线程等待(linkFirst方法在写入元素成功后会释放该锁信号),最后,在finally块中释放锁(ReentrantLock的使用)。

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

putFirst类似,takeFirst首先获取锁,然后在try中解除尾元素对象的引用,如果unlinkFirst为空,表示队列为空,没有元素可删,那么该线程等待。同样,最后在finally块中释放锁。

那么问题来了,LinkedBlockingDeque为什么不使用LinkedBlockingQueue读写锁分离的方式呢?LinkedBlockingDequeLinkedBlockingQueue的使用场景有什么区别呢?

1.3 DelayQueue

DelayQueue主要用于实现延时任务,比如:等待一段时间之后关闭连接,缓存对象过期删除,任务超时处理等等,这些任务的共同特点是等待一段时间之后执行(类似于TimerTask)。DelayQueue的实现包括三个核心特征:

因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}

接下来看下DelayQueue的读写操作如何实现延时任务的?

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

首先执行加锁操作,然后往优先队列中插入元素e,优先队列会调用泛型E的compareTo方法进行比较(具体关于二叉堆的操作,这里不再赘述,请参考数据结构部分相关分析),将延迟时间最短的任务添加到队头。最后检查下元素是否为队头,如果是队头的话,设置leader为空,唤醒所有等待的队列,释放锁。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

1.4 TransferQueue与LinkedTransferQueue

TransferQueue是一个继承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueueTransferQueue接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。

TransferQueue接口主要包含以下方法:

public interface TransferQueue<E> extends BlockingQueue<E> {
    boolean tryTransfer(E e);
    void transfer(E e) throws InterruptedException;
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean hasWaitingConsumer();
    int getWaitingConsumerCount();
}

LinkedTransferQueue实现了上述方法,较之于LinkedBlockingQueue在队列满时,入队操作会被阻塞的特性,LinkedTransferQueue在队列不满时也可以阻塞,只要没有消费者使用元素。下面来看下LinkedTransferQueue的入队和和出队操作:transfertake方法。

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

LinkedTransferQueue入队和和出队都使用了一个关键方法:

private E xfer(E e, boolean haveData, int how, long nanos) {}

其中,E表示被操作的元素,haveDatatrue表示添加数据,false表示移除数据;how有四种取值:NOW, ASYNC, SYNC, 或者TIMED,分别表示执行的时机;nanos表示howTIMED时的时间限制。
xfer方法具体流程较为复杂,这里不再展开。另外,LinkedTransferQueue采用了CAS非阻塞同步机制,后面会具体讲到)

上一篇下一篇

猜你喜欢

热点阅读