并发编程

J.U.C之阻塞队列(二)

2019-10-25  本文已影响0人  今年五年级

LinkedBlockingQueue

与ArrayBlockingQueue区别

linkedBlockingQueue与ArrayBlockingQueue主要区别为

  1. ArrayBlockingQueue是数组结构的队列,是有界队列
  2. LinkedBlockingQueue是链表结构的队列,可以是有界队列也可以是无界队列(capacity=Integer.MAX_VALUE)

主要属性

    //队列容量
    private final int capacity;
    //队列元素个数
    private final AtomicInteger count = new AtomicInteger();
    //对头
    transient Node<E> head;
    //队尾
    private transient Node<E> last;
    //take,poll,peek等读操作的方法需要获取读锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //如果读操作的时候队列为空,则等待不空条件
    private final Condition notEmpty = takeLock.newCondition();
    //put,offer等写操作的方法需要获取写锁
    private final ReentrantLock putLock = new ReentrantLock();
    //如果写操作的时候队列为满,则等待不满条件
    private final Condition notFull = putLock.newCondition();

锁和条件的搭配关系:如要读取,则不仅要获取读锁,而且还要满足队列不为空的条件


未命名文件 (11).png

从上图可以看出来,读操作和写操作单独看都是安全的,并发问题在于一个写操作和一个读操作同时进行

源码分析

构造方法

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

初始化的时候会构造一个空节点,第一个元素入队的时候,队列中会有两个元素,读取元素时,也总是会获取头节点后面的一个节点,count的计数值不包括这个头节点

put(E e)

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        //将待插入的元素构造为一个Node节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //获取队列中元素个数
        final AtomicInteger count = this.count;
        //用写锁加一个可以被中断抛出异常的锁
        putLock.lockInterruptibly();
        try {
            //当队列中容量满则等待不满的条件
            while (count.get() == capacity) {
                //释放锁+park+被唤醒(signal or interrupt)
                notFull.await();
            }
            //此时被通知不满开始入队
            enqueue(node);
            //先返回队列中元素的数量给c再给队列元素数量+1
            c = count.getAndIncrement();
            //如果这个元素入队后还有位置可以插入
            if (c + 1 < capacity)
                //唤醒等待在不满的条件的线程
                notFull.signal();
        } finally {
            //入队后,释放掉写锁
            putLock.unlock();
        }
        //c==0,则代表队列这个元素入队列之前是空的(不包括head节点)
        if (c == 0)
            //这里做一次唤醒操作让其他线程读取新加入的元素
            signalNotEmpty();
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

signalNotEmpty()

private void signalNotEmpty() {
        //获取读锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //唤醒线程来读取
            notEmpty.signal();
        } finally {
            //释放读锁
            takeLock.unlock();
        }
    }

take()

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //获取到读锁才能进行出队操作
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            //出队
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                //出队以后队列还不为空,唤醒等待在不为空队列的线程
                notEmpty.signal();
        } finally {
            //出队释放掉读锁
            takeLock.unlock();
        }
        //读取发生之前满了
        if (c == capacity)
            //刚刚进行了一次出队操作,已经有位置了,唤醒写线程往里面写入
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        //设置这个节点Wie新的头节点
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

signalNotFull()

类比上面的signalNotEmpty

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

PriorityBlockingQueue

带优先级(排序)的blockingQueue实现

上一篇 下一篇

猜你喜欢

热点阅读