java并发编程2018年面试Java并发编程

java并发编程之AbstractQueuedSynchroni

2016-12-15  本文已影响978人  miaoLoveCode

引言


AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架。

一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作:

protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);

AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。

注:AQS主要是怎么使用的呢?
在java的同步组件中,AQS的子类一般是同步组件的静态内部类。

AQS是实现同步组件的关键,它俩的关系可以这样描述:同步组件是面向使用者的,它定义了使用者与组件交互的接口,隐藏了具体的实现细节;而AQS面向的是同步组件的实现者,它简化了具体的实现方式,屏蔽了线程切换相关底层操作,它们俩一起很好的对使用者和实现者所关注的领域做了一个隔离。

AQS实现分析


接下来将从实现的角度来具体分析AQS是如何来完成线程同步的。

同步队列分析

AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点。

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}
- ```waitStatus```:节点状态,主要有这几种状态:
Node节点状态
  1. ```CANCELLED```:当前线程被取消;
  2. ```SIGNAL```:当前节点的后继节点需要运行;
  3. ```CONDITION```:当前节点在等待condition;
  4. ```PROPAGATE```:当前场景下后续的acquireShared可以执行;
  5. 0:当前节点在sync队列中等待获取锁。

Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:

AQS成员变量

对于锁的获取,请求形成节点将其挂在队列尾部,至于资源的转移,是从头到尾进行,队列的基本结构就出来了:

AQS同步队列结构

注:设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。

AQS源码解析

AQS提供以下接口以供实现自定义同步器:

  1. protected boolean tryAcquire(int arg)
    独占式获取同步状态,该方法的实现需要先查询当前的同步状态是否可以获取,如果可以获取再进行获取;
  2. protected boolean tryRelease(int arg)
    释放状态;
  3. protected int tryAcquireShared(int arg)
    共享式获取同步状态;
  4. protected boolean tryReleaseShared(int arg)
    共享式释放状态;
  5. protected boolean isHeldExclusively()
    独占模式下,判断同步状态是否已经被占用。

使用者可以根据实际情况使用这些接口自定义同步组件。

AQS提供两种方式来操作同步状态,独占式与共享式,下面就针对性做一下源码分析。

独占式同步状态获取 - acquire实现

独占式同步状态获取

具体执行流程如下:

  1. 调用tryAcquire方法尝试获取同步状态;
  2. 如果获取不到同步状态,将当前线程构造成节点Node并加入同步队列;
  3. 再次尝试获取,如果还是没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

下面我们具体来看一下节点的构造以及加入同步队列部分的代码实现。

addWaiter实现
  1. 使用当前thread构造Node;
  2. 尝试在队尾插入节点,如果尾节点已经存在,就做以下操作:
- 分配引用T指向尾节点;
- 将待插入节点的prev指针指向尾节点;
- 如果尾节点还为T,将当前尾节点设置为带待插入节点;
- T的next指针指向待插入节点。
  1. 快速在队尾插入节点,失败则进入enq(Node node)方法。
enq实现

enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:

  1. 初始化同步队列:如果尾节点为空,分配一个头结点,并将尾节点指向头结点;
  2. 节点入队,通过CAS将节点设置为尾节点,以此在队尾做节点插入。

可以看出,整个enq方法通过“死循环”来保证节点的正确插入。

进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued。对于同步队列中的线程,在同一时刻只能由队列首节点获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。

acquireQueued实现
  1. 获取当前节点的前驱节点;
  2. 如果当前节点的前驱节点是头节点,并且可以获取同步状态,设置当前节点为头结点,该节点占有锁;
  3. 不满足条件的线程进入等待状态。

在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态:

节点自旋获取同步状态

从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的前驱是不是头结点,这样设计使得节点的释放符合FIFO,同时也避免了过早通知。

注:过早通知是指前驱节点不是头结点的线程由于中断被唤醒。

当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,让后续节点可以获取到同步状态,调用方法release(int arg)方法可以释放同步状态。

独占式同步状态释放 - release实现

release实现
  1. 尝试释放状态,tryRelease保证将状态重置回去,同样采用CAS来保证操作的原子性;
  2. 释放成功后,调用unparkSuccessor唤醒当前节点的后继节点线程。
unparkSuccessor实现
取出当前节点的next节点,将该节点线程唤醒,被唤醒的线程获取同步状态。这里主要通过LockSupportunpark方法唤醒线程。

共享式同步状态获取
共享式获取与独占式获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示:

共享式与独占式获取资源
  1. 共享式访问资源时,其他共享式访问都是被允许的;
  2. 独占式访问资源时,在同一时刻只能有一个访问,其他的访问都被阻塞。

AQS提供acquireShared方法来支持共享式获取同步状态。

acquireShared实现
  1. 调用tryAcquireShared(int arg)方法尝试获取同步状态:
    tryAcquireShared方法返回值 > 0时,表示能够获取到同步状态;
  2. 获取失败调用doAcquireShared(int arg)方法进入同步队列。
doAcquireShared实现
  1. 获取当前节点的前驱节点;
  2. 如果当前节点的前驱节点是头结点,并且获取到的共享同步状态 > 0,设置当前节点的为头结点,获取同步状态成功;
  3. 不满足条件的线程自旋等待。

与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态。

共享式同步状态释放 - releaseShared实现

releaseShared实现
  1. 调用tryReleaseShared方法释放状态;
  2. 调用doReleaseShared方法唤醒后继节点;

独占式超时获取 - doAcquireNanos
该方法提供了超时获取同步状态调用,假如在指定的时间段内可以获取到同步状态返回true,否则返回false。它是acquireInterruptibly(int arg)的增强版。

注:为什么不直接进入超时等待呢?原因在于非常短的超时等待是无法做到十分精确的,如果这时候再进入超时等待会让nanosTimeout的超时从整体上表现的不精确,所以,在超时非常短的情况下,AQS都会无条件进入快速自旋;
- 如果nanosTimeout > 1000L,线程通过LockSupport.parkNanos进入超时等待。

整个流程可以总结如下图所示:


doAcquireNanos流程图

后记

在上述对AQS进行了实现层面的分析之后,我们就一个案例来加深对AQS的理解。

案例:设计一个AQS同步器,该工具在同一时刻,只能有两个线程能够访问,其他的线程阻塞。

设计分析:针对上述案例,我们可以这样定义AQS,设定一个初始状态为2,每一个线程获取一次就-1,正确的状态为:0,1,2,0表示新的线程获取同步状态时阻塞。由于资源数量大与1,需要实现tryAcquireSharedtryReleaseShared方法。

代码实现:

public class LockInstance implements Lock {
    
    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer { 
       Sync(int state) {
            if (state <= 0) {
                throw new IllegalArgumentException("count must large than 0");
            }
            setState(state);
        }

        @Override
        public int tryAcquireShared(int arg) {
            for (;;) {
                System.out.println("try acquire....");
                int current = getState();
                int now = current - arg;
                if (now < 0 || compareAndSetState(current, now)) {
                    return now;
                }
            }
        }

        @Override
        public boolean tryReleaseShared(int arg) {
            for(;;) {
                System.out.println("try release....");
                int current = getState();
                int now = current + arg;
                if (compareAndSetState(current, now)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.tryReleaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
上一篇下一篇

猜你喜欢

热点阅读