Java 1.8 & Android Oreo 常用源码分析

Java8 LinkedBlockingQueue 源码解析

2018-08-14  本文已影响4人  没有颜色的菜

LinkedBlockingQueue 链表阻塞队列

链表阻塞队列,顾名思义,也就是一个基于队列的阻塞式的链表实现,里面的代码写的很漂亮,生产者消费者模式在这个类中用的酣畅淋漓,其作者是大名鼎鼎的 Doug Lea,掌握这个类是比较重要的。里面很多实现基于锁,可以好好学习一下。

前世今生

继承自AbstractQueue 实现了 BlockingQueue,那我们就来看一下有哪些接口,这些是基础,要牢记


Screenshot from 2018-08-14 22-50-27.png

主要就是take(拿)和put(放),其余的offer,poll,add 有更高级的用法,稍后再说

使用场景

我查了一些资料,感觉和 MQ 有点联系,就是说我们可以使用这个东西进行解耦,或者负载均衡,比如说,有很多任务需要提交,我们可以把任务提交给 Queue,消费者负责处理消息,这个可以根据消费者的能力决定任务的执行效率,不会一下字任务过来而导致崩溃,讲道理,可以适合多生产者,多消费者模式,如果有这个,我们可以很好的进行解耦,负载均衡

实现原理大白话

属性

绝对的重头戏,上来就一堆干活,可以去补补 ReentrantLock 和 Condition 的知识,看一下,拿和取都用了一把锁,takeLock 对应的是 notEmpty Condition,putLock 对应的是 notFull Condition,
大致的意思就是,当你拿的时候,如果发现队列为空,则需要等待,当你放的时候,如果发现队列满了,也需要等待,这里的等待靠的就是notEmpty 和 notFull 的 await 方法,当你往队列添加元素成功后,则notEmpty.signal() 提醒消费者可以拿东西了,反之,当你取完一个东西之后,则 notFull.singal()提醒生产者可以放东西了。

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

还有一些其他的参数,容量以及对头队尾,默认容量为 Interger.MAX__VALUE,还有原子类的AtomicInterger 用于计数,保证原子性

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

函数

构造函数,head 和 tail 设置为空的节点对象,稍后在分析作用

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

put 函数,// 首先获取 putLock 的锁,保证putLock的线程安全性

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 构造一个新的节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 首先获取 putLock 的锁,保证putLock的线程安全性
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 如果队列已满,则等待消费者调用 notFull.signal()
            while (count.get() == capacity) {
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 原子加1
            c = count.getAndIncrement();
            // 如果此时队列没有满,调用 signal 更新 count
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放锁
            putLock.unlock();
        }
        // 如果 c 为 0;这里的 c 为之前的值
        // 也就是说,现在已经加1了,可以提示生产者
        if (c == 0)
            signalNotEmpty();
    }

入队,很简单,写的很巧妙,学习一下

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

出队,手动GC

   private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

获取 takeLock,通知消费者可以消费了

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

顺便看一下,通知生产者可以放入队列了

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

拿数据,首先还是要获取拿的锁,如果count == 0 ,那么则说明没有数据,需要阻塞等待,想象一下,这时候又想放入数据,那么将阻塞在takeLock里面,知道生产者调用了signal,执行出队。
注意一个问题,由于多线程的不定性,每次c > 1时,都需要再次signal,由于,多个消费者到会阻塞到await,而不是外面的 lockInterruptibly,所以每次都需要 signal,更新 count 的状态

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 阻塞,请注意,多个消费者都会阻塞在这里
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            // 注意,获取到更新前的值
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 如果之前队列满了,那么现在不会满了,提示生产者可以生产了
        if (c == capacity)
            signalNotFull();
        return x;
    }

这个方法如果没有数据直接返回空

  public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

同样的,这个方法利用了condition 的延迟性,可等待一定时间,如果超过这个时间还没有结果则返回null

 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

放入数据,同样,如果等待一定时间还不能放入,那么将抛弃

     public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

为了让大家更加明白 ReentrantLock,我这里给出一个例子供大家学习

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Create by xuantang
 * @date on 8/22/18
 */
public class ReentrantLockDemo {
    private static ReentrantLock mLock = new ReentrantLock();
    private static Condition mCondition = mLock.newCondition();

    public static void main(String[] args) {
        new WaitThread("waiter one").start();
        new WaitThread("waiter two").start();
        new WaitThread("waiter three").start();
        new NotifyThread("notify one").start();
    }

    static class WaitThread extends Thread {
        WaitThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            try {
                mLock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println(this.getName() + " Waiting......");
                mCondition.await();
                System.out.println(this.getName() + " Finished.....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mLock.unlock();
            }
        }
    }

    static class NotifyThread extends Thread {

        NotifyThread(String name) {
            super(name);
        }

        @Override
        public void run() {
            try {
                mLock.lockInterruptibly();

                mCondition.signal();
                System.out.println(this.getName() + " Notify.....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mLock.unlock();
            }
        }
    }
}

输入结果,只能唤醒一个,当然你可以使用 signalAll() 唤醒所有的

waiter one Waiting......
waiter two Waiting......
waiter three Waiting......
notify one Notify.....
waiter one Finished.....

小结

我们看到了可重入锁,Condition的高级用法利用,以及生产者消费者模型,通过了解源码,我们更加深入的学习到了这个模型的用法以及实现。

上一篇 下一篇

猜你喜欢

热点阅读