线程并发--AQS抽象队列同步器

2019-01-04  本文已影响94人  叩丁狼教育

本文作者:黄海燕,叩丁狼高级讲师。原创文章,转载请注明出处。

AQS队列同步器英文全称AbstractQueuedSynchronizer,这是一个抽象类,为什么我们今天需要学习这个抽象类呢?这个抽象类它的神奇之处到底是什么呢?我们一起来掀开它的神奇面纱吧!

什么是AQS(AbstractQueuedSynchronizer)?

AQS关系.png

AQS中文翻译为同步器,Lock接口的实现类基本都是通过包含AQS子类对象来完成线程访问控制的。比如说Lock中的获取锁和释放锁操作。说白了就是一把抽象的锁。

Lock通过调用AQS子类的lock方法实现获取锁,unlock方法实现释放锁。

lock方法.png unlock方法.png

1.1 AQS源码分析

1.1.1类字段分析:

    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

1)我们可以看到字段中其实最主要的就是一个volatile 修饰的state(状态),state用于标记锁状态。

2)compareAndSetState方法是使用CAS算法修改state保证state修改时的数据安全。

3)head和tail表示队列的头和尾,也就是锁该类将线程和线程状态封装成一个节点Node存入到同步队列中,结构如下:

叩丁狼教育.png

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

1.1.2方法分析:

叩丁狼教育.png

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

1.1.3锁类型分析:

叩丁狼教育.png

1)公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2)非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。

注意:ReentrantLock初始化默认为非公平锁,非公平锁减少了线程的挂起和恢复运行效率高于公平锁。公平锁可以解决饥饿发生。

1.1.4获取锁操作分析:

final void lock() {//获取锁
    if (compareAndSetState(0, 1))//获取锁成功
        setExclusiveOwnerThread(Thread.currentThread());//独占锁
    else
        acquire(1);//其他线程尝试获取锁
}

acquire****方法:
叩丁狼教育.png

1)tryAcquire()尝试直接去获取资源,如果成功则直接返回;

2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

3)acquireQueued()****方法;重新竞争同步锁

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//自旋
            final Node p = node.predecessor();//获取当前节点的上一个节点
            if (p == head && tryAcquire(arg)) {//上一节点为头节点和获取尝试获取锁成功
                setHead(node);//上一个节点已经获取到锁,将当前阶段设置为头节点,下一次就由自己获取锁
                p.next = null; // 头节点获取到锁离开队列
                failed = false;
                return interrupted;//返回中断状态
            }
             //不是头节点或者获取不到锁
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


shouldParkAfterFailedAcquire****方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)//如果上一节点被唤醒,当前节点就进入等待
        return true;
    if (ws > 0) {//如果上一节点线程被取消了
        do {
            node.prev = pred = pred.prev;//当前阶段尝试跳过上一节点,插个队
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//尝试将上一个节点设置为唤醒状态
    }
    return false;
}
parkAndCheckInterrupt****方法:
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//线程进入等待状态
    return Thread.interrupted();//线程唤醒后,检查自己是否真的处于中断状态,注意:interrupted会清除中断状态
}

1)selfInterrupt(),自我中断,当前线程调用了interrupt方法

流程图如下:

叩丁狼教育.png

16.1.5 释放锁操作分析

release****方法:
public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)//头节点不为null并且等待状态不是为没有状态
            unparkSuccessor(h);//唤醒头节点的后面一个节点
        return true;
    }
    return false;
}
tryRelease****方法:
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//重入锁state大于1,需要调用两次unlock方法,释放两次
    if (Thread.currentThread() != getExclusiveOwnerThread())//如果独占锁不是当前需要释放的线程,抛出异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);//独占锁设置为null,释放锁
    }
    setState(c);
    return free;
}
unparkSuccessor****方法:
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)//等待状态,不是被取消的状态
        compareAndSetWaitStatus(node, ws, 0);//CAS修改回到最初的状态,没有等待的状态下
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//各种等待情况下
        s = null;
           //从队尾开始找不为空和不是当前节点的节点,直到找到当前节点的后面一个节点位置
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//如果t为等待状态下
                s = t;//s设置为t,拿到t这个节点
    }
    if (s != null)//唤醒当前节点的下一个节点
        LockSupport.unpark(s.thread);
}

想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

上一篇下一篇

猜你喜欢

热点阅读