redis

ArrayBlockingQueue

2019-06-18  本文已影响0人  Pillar_Zhong

enqueue

// putindex认为是写入的索引
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // 入队到putIndex的位置
    items[putIndex] = x;
    // 如果putIndex已经到队尾,那么重置从头开始
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 入队,当然notEmpty条件已经满足,需要唤醒
    notEmpty.signal();
}

dequeue

// takeindex认为是读取的索引
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 拿到takeIndex的值赋予x
    E x = (E) items[takeIndex];
    // 设置takeIndex位置为null
    items[takeIndex] = null;
    // 如果takeindex已经到末尾,那么从头开始
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 出队的结果,数组未满已经满足,需要唤醒
    notFull.signal();
    return x;
}

offer

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 独占锁加锁
    lock.lock();
    try {
        // 如果数组满,那么直接返回
        if (count == items.length)
            return false;
        else {
            // 否则入队
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

put

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 独占锁加锁,且可中断
    lock.lockInterruptibly();
    try {
        // 自旋等待数组未满为止
        while (count == items.length)
            notFull.await();
        // 入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果数组为空,返回null
        // 否则出队
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 自旋等待数组不为空为止,出队
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

peek

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 直接返回takeIndex位置的值,但并不移除它
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读