队列同步器AQS

2018-06-28  本文已影响34人  蒸汽飞船

队列同步器AQS

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。

子类推荐被定义为自定义同步组件的静态内部类(如RetrantLock等类内部的Sync类),同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

队列同步器接口说明:

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

同步器提供的状态码相关操作方法:

getState():获取当前同步状态 
setState():设置当前同步状态 
compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性 
同步器可重写的方法

重写同步器指定的方法时,需要使用上述方法来访问或修改同步状态:

// 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后通过CAS设置同步状态
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

// 独占式释放同步锁,等待获取同步状态的线程有机会获取到同步状态
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
// 共享式获取同步状态,返回大于0表示成功,反之失败 
protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
// 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占
protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

提供的api方法:


// 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则进入同步队列等待,该方法将会调用重写的
// tryAccquire方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
//  与acquire方法相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,该方法
// 会抛出InterruptedException并返回
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

// 在acquireInterruptibly方法上添加了超时延迟,如果当前线程在超时时间内没有获取到同步状态返回false,否则返回true
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

// 共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的唯一区别是同一时刻可以有多个线程获取到同步状态
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

// 与acquireShared,只是该方法响应中断
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
// 在acquireShared基础上添加了超时间
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
// 独占式释放同步状态,该方法会在释放同步状态之后将同步队列第一个节点包含的线程唤醒
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

// 共享式释放同步状态 
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
// 获取等待在同步队列上的线程集合 
public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }
自定义独占锁:
package com.thread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {

    // 静态内部类,自定义同步器
    @SuppressWarnings("serial")
    private static class Sync extends AbstractQueuedSynchronizer{
        // 是否处于占用状态
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }

        // 当状态==0获取锁,更新state=1
        public boolean tryAcquire(int acquires){
            if(compareAndSetState(0, 1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放锁,将状态设置为0
        public boolean tryRelease(int release){
            if(getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 返回一个condition,每个condition都包含一个condition队列
        Condition newCondition(){
            return new ConditionObject();
        };
    }
    // 仅需要将操作代理到Sync上
    Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() { 
        return sync.isHeldExclusively(); 
    }

    public boolean hasQueuedThreads() { 
        return sync.hasQueuedThreads(); 
    }
}

上述示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases方法中只是将同步状态重置为0。用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

上一篇下一篇

猜你喜欢

热点阅读