JDK源码解析

DelayQueue源码分析

2020-05-06  本文已影响0人  i砖工

延迟队列:往队列中放入的元素具有一定的延迟时间,延迟时间到期后,take或者poll方法才能获取到这些元素。
先看以下延迟队列的构造:

//1.队列中的元素通过实现Delayed接口来实现延迟时间控制
//2.队列实现BlockingQueue接口,实现了相应的阻塞功能
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //操作公共资源的地方,采用加锁方式实现
    private final transient ReentrantLock lock = new ReentrantLock();
    //队列内部使用优先级队列来存放元素
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //队列使用Condition来控制读写线程的相互等待
    private final Condition available = lock.newCondition
    //leader:用于标记是否已经有线程与头节点配对了,并且记录是哪个线程配对的。
    private Thread leader = null;   
}

作为一个阻塞队列,则一定具备offer,take,poll的能力,下面我们一一来看以下具体的实现。
offer方法:

//新增元素
public boolean offer(E e) {
    //通过锁来控制并发
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //优先队列中存放元素
        q.offer(e);
        //才放入的元素成为了头节点,则有两种情况,第一种是队列本身为空,这种情况下是否重置leader=null其实没有影响,因为这个时候本身leader就为Null
        //另外一种情况是新放入的元素由于优先级最高,所以变为了头节点,这个时候需要重新设置leader为null,并且唤醒等待中的线程,让取数线程重新和新的头节点配对
        //因为新的头节点的delay时间肯定不同了,所以需要线程醒来后,重新配对,重新设置睡眠时间(take方法的代码1处)
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take方法:

/***
一直等待,直到队列中有元素到期,或者线程被中断
***/
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); //获取锁并接受中断
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) //队列为空
                available.await(); //睡眠当前线程,释放锁,让其它线程(生产者或者消费者)操作
            else { //队列不为空
                long delay = first.getDelay(NANOSECONDS);//获取头节点的过期时间
                if (delay <= 0) //头节点已经过期,直接返回节点
                    return q.poll();
                first = null; // don't retain ref while waiting
                
                //头节点未过期,说明所有节点都还未过期,无法出队列, 当前线程需要睡眠等待
                if (leader != null)  //leader的设定是标记即将获取元素的线程,如果它不等于空,说明当前即将过期的头节点已经有其它线程预定了,所以当前线程只能睡眠了。
                    available.await();
                else {//如果leader等于空,则当前线程为本次获取该头节点的线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //代码1
                        available.awaitNanos(delay); //让当前线程睡眠时间等于头节点到期时间
                    } finally {
                        //当前线程从睡眠中醒来,或者被中断。如果是正常醒来, 则delay<=0了,下一次循环则会出队列,所以让出leader位置。
                        //如果是被中断,则说明本线程放弃了获取元素的权力,同样需要让出leader的位置
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //如果leader为空(当前头节点没有被预定),并且队列不为空,唤醒在条件上等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();//释放锁
    }
}

poll方法与take方法类似,只是多了一个方法自带的过期时间:

//带过期时间的获取元素
//该方法一直等待元素过期,直到方法指定的timeout到期
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);  //方法等待过期时间
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) { //如果头节点等于空
                if (nanos <= 0)  //方法等待到期
                    return null; 
                else
                    nanos = available.awaitNanos(nanos);  //队列中没有可以出队的元素,则让当前线程阻塞
            } else { //头节点不等于空
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) //头节点延迟时间过期,则直接出队列
                    return q.poll();
                    
                if (nanos <= 0)  //如果头节点还未过期,而方法等待时间过期,则返回null
                    return null;
                
                first = null; // don't retain ref while waiting  //线程等待期间不引用头节点
                
                
                //-----以下是阻塞队列的逻辑---------
                // //如果方法等待时间比头节点过期时间小,则睡眠方法等待时间(取小),否则如果leader!=null(头节点已经被预定),则同样睡眠方法等待时间(取大)
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos); 
                else { //如果方法等待的时间大于头节点过期时间(方法能等到头节点过期并将它取出,但是当前头节点还未过期),并且leader线程等于null
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; //当前线程获得与头节点配对的资格
                    try {
                        long timeLeft = available.awaitNanos(delay); //线程睡眠头节点过期时间
                        nanos -= delay - timeLeft;  //方法等待时间减去本次已经等待的实际时间,然后自旋重新检查元素过期情况。
                    } finally { //见take方法的说明
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {//见take方法的说明
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

总结:
1.要想使用延迟队列,则元素一定要实现Delayed接口
2.延迟队列中,线程安全的问题是通过显示的重入锁控制的,所以同一时间只有一个线程在操作
3.内部是用优先级队列来存放元素的,优先级队列非线程安全,所以才需要2.
4.线程等待是通过显示锁的条件进行等待的,由于Condition的实现也是一个FIFO队列,所以DelayQueue是一个公平策略的队列。
5.每一个线程是否进行睡眠,或者睡眠多久,其关键是leader决定的,因为leader告知了当前线程头节点是否已经被别的线程配对。

上一篇 下一篇

猜你喜欢

热点阅读