AQS阅读指南

2019-07-30  本文已影响0人  alonwang
image.png

前言

阅读AQS源码的过程很是艰辛,在陆陆续续读了三四次后,大概搞懂了,特此分享一些阅读时的技巧/特点,希望能帮助大家更轻松的读懂AQS.

设计要点

1.1 Node

Node是AQS的内部数据结构,双向链表,一个Node表示一个线程.当一个线程"设置状态"失败,会被构造成Node添加到阻塞队列尾部. 当一个"设置状态"成功的线程执行完成,"释放状态"时,会唤醒阻塞队列首部之后的Node对应的线程尝试"设置状态"

static final class Node {
          //节点状态  
        volatile int waitStatus;
                //前一个节点
        volatile Node prev;
                //下一个节点
        volatile Node next;
                //对应的线程
        volatile Thread thread;
                //Condition使用
        Node nextWaiter;
                ...
}

1.2 state

"设置状态","释放状态"是AQS的一个抽象概念, 根据不同的用途,描述也不同,在锁上描述就是就是获取锁,释放锁. AQS提供了操作state的方法,由子类根据需要定义state的含义, 例如对于独占锁 0表示锁未被获取,1表示锁已被获取.

        /**
     * The synchronization state.
     */
    private volatile int state;

1.3 LockSupport

由于Thread.sleep和Thread.resume存在死锁问题,AQS中阻塞和唤醒线程使用LockSupport的park和unpark实现.它基于一套"契约"机制. 契约有且只能有一个,unpark给与一个契约,park消耗一个契约.如果先unpark再park,线程不会阻塞

LockSupport.unpark(xxx);
//LockSupport.unpark(xxx);... 无论调用多少次也只会有一个契约 
LockSupport.park(xxx);

1.4 Condition

AQS中仿照监视器的等待通知模式,使用Condtion接口提供了类似但更强大的功能

1.5 阻塞队列和等待队列

阻塞队列指获取锁失败被阻塞等待唤醒的线程,等待队列指调用Condition.wait()放弃锁等待被Condition.signal()唤醒的线程,两者都使用Node实现.

两者还有一点关联:当等待队列中的节点被唤醒后,会移动到阻塞队列中,等待获取锁.

阅读技巧

2.1 &&和||操作符的短路应用

AQS中有大量&&和||的使用,充分利用了它们的短路特性来避免嵌套if/else.代码看起来很精巧,相应的阅读难度也提高了.以acquire为例

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


//相同效果的if/else实现
public final void acquire(int arg){
            if(!tryAcquire(arg)){
                if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
                    selfInterrupt();
                }
            }

如果使用自然语言描述进入最内层代码的流程 —"当前线程尝试获取锁失败,加入到阻塞队列尾部,首次/被唤醒后尝试获取锁并检查是否被中断,如果被中断,就进行自我中断"

如果考虑所有情况,势必会有一堆if/else,不得不说是很精巧的设置,为了降低阅读难度,可以自行替换为if/else形式方便理解

2.2 线程的"隐式"唤醒

以acquireQueued(addWaiter(Node.EXCLUSIVE), arg)为例(源码为for(;;),这里替换为do while方便理解)

//替换为下面这种形式更容易理解
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
                        //尝试获取锁,如果获取失败阻塞当前线程
             do {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            } while (true)
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

"死循环尝试获取锁,如果失败阻塞线程并设置中断状态"

如果第一次循环时获取锁失败,线程就会被阻塞,那么后续循环是如何触发的呢? 被其他线程唤醒

然而唤醒并不会在acquireQueued方法中体现出来,这里可能会造成迷惑,因此在AQS中如果看到死循环,请结合此情形思考.(PS. 一个相关的知识 线程虚假唤醒的Java演示)

抽象模板

AQS使用了抽象模板设计模式,使用时需要根据功能实现它定义的几个抽象方法

独占式

共享式

AQS同时提供了几个方法供子类使用

实例 TwinLock

TwinLock允许同时最多两个线程拥有锁,是共享式锁对AQS的典型用法

public class TwinLock implements Lock {
    private final Sync sync = new Sync(2);

    private static class Sync extends AbstractQueuedSynchronizer {
        public Sync(int count) {
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for (;;) {
                int current = getState();
                int newState = current - arg;
                if (newState < 0 || compareAndSetState(current, newState)) {
                    System.out.println(Thread.currentThread().getName()
                            + " acquired, newState " + newState);
                    return newState;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;) {
                int current = getState();
                int newState = current + arg;
                if (compareAndSetState(current, newState)) {
                    System.out.println(Thread.currentThread().getName()
                            + " released, newState " + newState);
                    return true;
                }
            }
        }

    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        try {
            return sync.tryAcquireSharedNanos(1, 0);
        } catch (InterruptedException ignore) {

        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    public static void main(String[] args) throws InterruptedException {
    }

    private static void tryLockTest() {
        TwinLock lock = new TwinLock();
        Runnable r = () -> {
            boolean success = lock.tryLock();
            if (success) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();

                }
            } else {
                System.out.println(
                        Thread.currentThread().getName() + " acquire failed");
            }
        };
        for (int i = 1; i <= 8; i++) {
            new Thread(r, "t-" + i).start();
        }
    }

    private static void lockTest() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);

        TwinLock lock = new TwinLock();
        Runnable r = () -> {
            lock.lock();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                latch.countDown();
            }
        };
        for (int i = 1; i <= 10; i++) {
            new Thread(r, "t-" + i).start();
        }
        latch.await();
    }

https://blog.csdn.net/vernonzheng/article/details/8275624

https://whitesock.iteye.com/blog/1336409

上一篇 下一篇

猜你喜欢

热点阅读