Java多线程

J.U.C 阻塞队列(三) - LinkedBlockingQu

2019-05-16  本文已影响0人  贪睡的企鹅

1 概述

LinkedBlockingQueue 是一个用单项链表实现的的线程安全有界的阻塞队列。队列按照先进先出(FIFO)的原则对元素进行排序。

通过以下关键词分析我们来更深入理解ArrayBlockingQueue

1.1 如何理解“队列”

队列这个概念非常好理解。你可以把它想象成排队买票,先来的先买,后来的人只能站末尾,不允许插队。先进者先出,这就是典型的“队列”。

相对于栈只支持两个基本操作:入栈 push()和出栈 pop(),对于也只支持两个操作入队 enqueue(),放一个数据到队列尾部;出队 dequeue(),从队列头部取一个元素,因此队列跟栈一样,也是一种操作受限的线性表数据结构

image
1.2 如何理解“线程安全的”

在多线程情况下,会有多个线程同时操作队列,这个时候就会存在线程安全问题,线程安全的队列我们叫作并发队列.

那如何实现一个线程安全的队列呢?

方式一
最简单直接的实现方式是直接在 enqueue()、dequeue() 方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。

方式二
利用 CAS 原子操作,可以实现非常高效的并发队列。

1.3 阻塞队列

阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。

1.4 有界队列

有界队列表示队列中存储数据是有限,如果队列满后在次向队列中添加数据会失败或阻塞。

2 实现一个"队列"

我们知道了,队列跟栈一样,也是一种抽象的逻辑存储结构。它具有先进先出的特性,支持在队尾插入元素,在队头删除元素。如果要想实现一个队列可以用数组来实现,也可以用链表来实现,用数组实现的队列叫作顺序队列,用链表实现的队列叫作链式队列

2.1 链式队列

基于链表的实现,我们同样需要两个指针:head 指针和 tail 指针。它们分别指向链表的第一个结点和最后一个结点。

入队
tail->next= new_node, tail = tail->next;

出队
head = head->next

image

3 LinkedBlockingQueue源码解析

3.1 类结构

image

3.2 实现原理

1 LinkedBlockingQueue时基于链表实现的队列

2 相对于ArrayBlockingQueue,链表实现的队列出队入队完全可以多线程执行,并不相互依赖。因而按时锁的原则来说锁的粒度越小越好,因而LinkedBlockingQueue使用了2把锁,分别针对出队入队各自分配一把锁(takeLock,putLock)来保证线程安全。

3 LinkedBlockingQueue内部存在着二个可重入的锁(takeLock,putLock),同时分别生成二个等待队列Condition(notFull,notEmpty) -- 生产者和消费者模式

3.3 核心属性

    static class Node<E> {
        //队列节点中保存元素
        E item;
        //当前节点的下一个节点
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 队列容量 */
    private final int capacity;

    /** 队列保存的元素个数 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 头部节点
     */
    transient Node<E> head;

    /**
     * 尾部节点
     */
    private transient Node<E> last;

    /** 出队锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 出队等待队列 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 入队锁c */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 入队等待队列 */
    private final Condition notFull = putLock.newCondition();
    

3.4 构造函数

//创建一个带有默认的(Integer.MAX_VALUE)容量队列
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

//创建一个带有给定的(capacity)容量队列
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

//创建一个带有默认的(Integer.MAX_VALUE)容量,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //将指定容器中元素依此添加到队列中
    final ReentrantLock putLock = this.putLock;
    //获取putLock独占锁,成功返回,失败则阻塞
    putLock.lock(); 
    try {
        int n = 0;
        for (E e : c) {
            //检查添加数据不能为null,NullPointerException
            if (e == null)
                throw new NullPointerException();
            //添加collection超出,IllegalStateException    
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //添加到队列尾部    
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

3.5 入队操作

将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。

public boolean offer(E e) {
        //添加元素为null,抛出异常
        if (e == null) throw new NullPointerException();
        
        // 如果“队列已满”,则返回false,表示插入失败。
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
            
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //获取putLock独占锁,成功返回,失败则阻塞
        putLock.lock();
        try {
            if (count.get() < capacity) {
                //将当前添加节点添加队列尾部
                enqueue(node);
                // 将“当前节点数量”+1,并返回“原始的数量”
                c = count.getAndIncrement();
                // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 释放“插入锁putLock”
            putLock.unlock();
        }
        // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    //将当前添加节点添加队列尾部
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }
    
    //释放notEmpty等待队列中线程
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock独占锁
        takeLock.lock();
        try {
            //释放notEmpty等待队列中线程
            notEmpty.signal();
        } finally {
            //释放takeLock独占锁
            takeLock.unlock();
        }
    }

将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间,在成功时返回 true。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //添加元素为null,抛出异常
        if (e == null) throw new NullPointerException();
        //获取等待的时间
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取putLock独占锁,成功返回,失败则阻塞
        putLock.lockInterruptibly();
        try {
            //如果队列已满,等待时间大于0。将当前线程添加notFull等待队列,并限时阻塞当前线程
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            //将当前添加节点添加队列尾部
            enqueue(new Node<E>(e));
            //将“当前节点数量”+1,并返回“原始的数量”
            c = count.getAndIncrement();
            // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上的等待线程。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放“插入锁putLock”
            putLock.unlock();
        }
        // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上的等待线程
        if (c == 0)
            signalNotEmpty();
        return true;
    }

put,该操作成功返回 true,失败则进入阻塞。

 public void put(E e) throws InterruptedException {
        //添加元素为null,抛出异常
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取putLock独占锁,成功返回,失败则阻塞,响应中断
        putLock.lockInterruptibly();
        try {
            //如果队列已满。将当前线程添加notFull等待队列,并阻塞当前线程
            while (count.get() == capacity) {
                notFull.await();
            }
            //将当前添加节点添加队列尾部
            enqueue(node);
             //将“当前节点数量”+1,并返回“原始的数量”
            c = count.getAndIncrement();
           // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放“插入锁putLock”
            putLock.unlock();
        }
        // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程
        if (c == 0)
            signalNotEmpty();
    }

3.6 出队操作

public E poll() {
    final AtomicInteger count = this.count;
    //如果队列元素为空,返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //获取putLock独占锁,失败则阻塞
    takeLock.lock();
    try {
        //队列中存在元素
        if (count.get() > 0) {
            //获取队列首部元素
            x = dequeue();
            // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
            c = count.getAndDecrement();
            // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        //释放takeLock独占锁
        takeLock.unlock();
    }
    //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
    if (c == capacity)
        signalNotFull();
    return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        //获取等待的时间
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,将当前线程添加到notEmpty等待队列,并限时阻塞当前线程
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
             //获取队列首部元素
            x = dequeue();
            // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
            c = count.getAndDecrement();
            // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //释放takeLock独占锁
            takeLock.unlock();
        }
        //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
        if (c == capacity)
            signalNotFull();
        return x;
    }
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,将当前线程添加notEmpty等待队列,并阻塞当前线程
            while (count.get() == 0) {
                notEmpty.await();
            }
            //获取队列首部元素
            x = dequeue();
            // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
            c = count.getAndDecrement();
            // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //释放takeLock独占锁
            takeLock.unlock();
        }
        //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态
        if (c == capacity)
            signalNotFull();
        return x;
    }

3.7 指定元素出队

public boolean remove(Object o) {
        if (o == null) return false;
        //同时获取putLock,takeLock独占锁
        fullyLock();
        try {
            //从头部节点开始向后遍历查找删除节点以及其前置前置节点
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    //将节点从链表中删除
                    //p表示将要删除的节点
                    //trail表示要删除的节点前置系欸但
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //同时释放putLock,takeLock独占锁
            fullyUnlock();
        }
    }
    
    //将节点从链表中删除
    //p表示将要删除的节点
    //trail表示要删除的节点前置节点
     void unlink(Node<E> p, Node<E> trail) {
        //将节点p从链表中删除
        p.item = null;
        trail.next = p.next;
        //如果节点p是链表尾部,last指向trail
        if (last == p)
            last = trail;
         //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态    
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

3.8 查询操作

获取队列中元素数量

public int size() {
        return count.get();
}

查看队列中剩余的位置

public int remainingCapacity() {
        return capacity - count.get();
}

4 LinkedBlockingQueue使用

4.1 函数列表

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)
// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

示例

import java.util.*;
import java.util.concurrent.*;

/*
 *   LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 *   下面是“多个线程同时操作并且遍历queue”的示例
 *   (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
 *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 * @author skywang
 */
public class LinkedBlockingQueueDemo1 {

    // TODO: queue是LinkedList对象时,程序会出错。
    //private static Queue<String> queue = new LinkedList<String>();
    private static Queue<String> queue = new LinkedBlockingQueue<String>();
    public static void main(String[] args) {
    
        // 同时启动两个线程对queue进行操作!
        new MyThread("ta").start();
        new MyThread("tb").start();
    }

    private static void printAll() {
        String value;
        Iterator iter = queue.iterator();
        while(iter.hasNext()) {
            value = (String)iter.next();
            System.out.print(value+", ");
        }
        System.out.println();
    }

    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }
        @Override
        public void run() {
                int i = 0;
            while (i++ < 6) {
                // “线程名” + "-" + "序号"
                String val = Thread.currentThread().getName()+i;
                queue.add(val);
                // 通过“Iterator”遍历queue。
                printAll();
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读