一些收藏

Java并发编程 AQS

2020-03-14  本文已影响0人  香沙小熊

1.为什么需要AQS

如果没有AQS,就需要每个协作工具自己实现:

2.AQS的作用

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便的被写出来。
有了AQS,构建线程协作类就容易多了。

比如:
Semaphore和AQS的关系
Semaphore内部有一个Sync类,Sync类继承了AQS

3.AQS的重要性、地位

AbstractQueuedSynchronizer是Doug Lea写的,从JDK1.5加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,我们用IDE看AQS的实现类,可以发现实现类有以下这些:


image.png

4.AQS内部原理解析

AQS最核心的就是三大部分:

4.1state状态
   protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
4.1 state状态
4.2 控制线程抢锁和配合的FIFO队列
image.png
4.3 期望协作工具类实现的获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同

获取方法
释放方法
还需要重写tryAcquire方法和tryRelease等方法

5.AQS应用实例、源码解析

AQS在ReentrantLock的应用

第一步:写一个来,想好协作的逻辑,实现获取/释放方法
第二步:内部写一个Sync类继承AbstractQueuedSynchronizer
第三步:根据是否独占重写tryAcquire/tryRelease或tryAcquireShared() 和tryReleaseShared()等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法

AQS在CountDownLatch的应用
* 构造函数
   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
   Sync(int count) {
            setState(count);
        }

调用AbstractQueuedSynchronizer

    protected final void setState(int newState) {
        state = newState;
    }
* getCount
    public long getCount() {
        return sync.getCount();
    }

调用AbstractQueuedSynchronizer

    protected final int getState() {
        return state;
    }
* countDown
    public void countDown() {
        sync.releaseShared(1);
    }

CountDownLatch内部Sync类实现tryReleaseShared

       protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

调用AbstractQueuedSynchronizer

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

把所有等待的线程去唤醒

 private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
* await
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

CountDownLatch内部Sync类实现tryAcquireShared

       protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

调用AbstractQueuedSynchronizer

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

doAcquireSharedInterruptibly把当前线程放在阻塞队列中进行等待

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

parkAndCheckInterrupt方法

  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

把当前线程挂起 进入阻塞状态

AQS在CountDownLatch的总结

调用CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞。
“共享锁”可获取到的条件,就是“锁计数器”的值为0
“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1

AQS在Semaphore的应用
* acquire
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

调用AbstractQueuedSynchronizer

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

Semaphore中内部类

  /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

nonfairTryAcquireShared 检查剩余许可证数量够不够

      final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
* release
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

调用AbstractQueuedSynchronizer 中release方法

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

CountDownLatch内部Sync类实现tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

cas设置state值

AQS在Semaphore的总结
AQS在ReentrantLock的应用

ReentrantLock

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

调用AbstractQueuedSynchronizer 中release方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantLock 中内部类Sync类

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

unparkSuccessor方法表示唤醒其他线程

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

getState 代表重入的次数
free = true; 表示锁是自由状态

   public void lock() {
        sync.lock();
    }
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

setExclusiveOwnerThread 表示设置互斥独有线程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

添加等待节点

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

acquireQueued 获取的线程处于互斥的不可中断模式队列。用于条件等待方法以及获取。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
ReentrantLock释放锁分析

由于是可重入的,所以state代表重入的次数,每次释放锁,先判断是不是当前持有的锁的线程释放的,如果不是就抛异常,如果是的话,重入次数就减1,如果见到了0,就说明完全释放了,于是free就是true,并且把state设置为0.

ReentrantLock加锁分析

第一步:调用子类实现的获取锁的方法 tryAcquire 成功则结束if
Reentrantlock中两个子类的尝试修改锁状态方法
非公平 不判断当前是否还有其他节点在队列中 hasQueuedPredecessors
第二步:失败 进去 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter 将当前线程节点入队 先cas快速入队 失败进入循环cas入队
第三步 acquireQueued 尝试将线程阻塞 需要循环向前查找非取消的节点 然后才能进行阻塞 即 前一个节点非cancel 然后设置成SIGNAL 状态 这样前节点的线程如果释放了同步状态或者被取消时候会通知本线程

调用lock方法,存在竞争的时候,T2会去入队,首先会初始化一个空节点,t2节点实际上存放的是第二个位置,t3进来的时候继续在后面排队,
t2和t3都是调用park方法进行阻塞。入队的时候会将前面的节点的waitstatus状态由0改为-1。在调用unlock的时候会将waitstatus不等于0的释放。

6.AQS实现一个自己的Latch门闩

public class OneShotLatch {

    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }
    public void await() {
        sync.acquireShared(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected int tryAcquireShared(int arg) {
            return (getState() == 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
           setState(1);

           return true;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                    oneShotLatch.await();
                    System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
                }
            }).start();
        }
        Thread.sleep(5000);
        oneShotLatch.signal();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                oneShotLatch.await();
                System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
            }
        }).start();
    }
}
Thread-2尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-0尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
开闸放行Thread-2继续运行
开闸放行Thread-0继续运行
开闸放行Thread-3继续运行
开闸放行Thread-4继续运行
开闸放行Thread-1继续运行
Thread-5尝试获取latch,获取失败那就等待
开闸放行Thread-5继续运行
上一篇下一篇

猜你喜欢

热点阅读