LinkedBlockingQueue分析

2018-05-24  本文已影响1人  gczxbb

LinkedBlockingQueue内部结构采用链式存储,元素节点Node,封装对象E泛型,Node#next指向下一个节点。

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}
存储结构图。 阻塞队列LinkedBlockingQueue链式存储结构.jpg

LinkedBlockingQueue#构造方法。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

参数说明
capacity:容量,默认是Integer.MAX_VALUE。
count:当前队列元素数量,AtomicInteger线程同步,可根据count与capactiy比较值作为是否已满的依据。
head/last指针:初始化创建第一个Node节点作为头结点与尾节点,封装内容是null。


LinkedBlockingQueue插入节点

put/offer方法:向尾部插入元素,加锁putLock。区别是,若队列已满,put方法线程阻塞等待,当有空余位置时,被唤醒,继续插入,offer直接返回插入失败。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();//以下代码段加锁
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node); // 到这里插入,说明有空余至少一个。
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();//若还有至少一个空余,通知其他等待的不满信号。
    } finally {
        putLock.unlock();//解锁
    }
    if (c == 0)
        signalNotEmpty();
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

offer方法比较简单,判断count未达到最大容量capactiy,插入节点。同理,根据空余空间发出未满信号。c的初始值是-1,若成功插入,则一定会赋值成一个>=0的值,因此。offer的返回的值可判断是否成功插入,offer不会阻塞。


LinkedBlockingQueue取出节点

take/poll方法:从队列头部加锁takeLock获取元素。若队列为空,take方法获取线程阻塞等待,有元素时被唤醒。poll返回失败。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

分析逻辑与put一致。

生产者与消费者模型,元素在头部获取,尾部插入。多线程并发操作时,采用两种锁保证线程同步,互相独立,并行操作队列元素,数量更新AtomicInteger同步。


任重而道远

上一篇下一篇

猜你喜欢

热点阅读