android

多线程知识梳理(11) - 队列同步器实现原理 & 应用

2019-05-09  本文已影响17人  泽毛

一、基本概念

队列同步器在Java并发包中的实现是AbstractQueuedSynchronizer,简称为AQS,它是用来构建锁或者其它同步组件的基础框架。了解其实现原理有助于:

队列同步器AQS和同步组件(ReentrantLockReentrantReadWriteLock等)的区别在于:

队列同步器基于模板方法模式,它的使用方式为如下:

ReentrantLock为例,ReentrantLock为自定义的同步组件,NonfairSyncFairSync就是队列同步器的实现类。

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        //...
    }
    
    static final class NonfairSync extends Sync {
        //...
    }
    
    static final class FairSync extends Sync {
        //...
    }
    
    public void lock() {
        sync.lock();
    }

}

二、同步状态 & 同步队列

在队列同步器中有两个关键的元素:

2.1 同步状态

同步状态,用一个int型表示,访问或者修改同步状态需要使用指定的方法:

2.2 同步队列

同步队列用来完成同步状态的管理,当前线程获取同步状态失败时,会将当前线程以及等待状态等信息构造成一个结点,将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首结点的后继结点的线程唤醒,使其再次尝试获取同步状态。

同步队列中的结点保存了以下的信息:

同步器包含了两个类型结点的引用:头结点和尾结点。设置的区别在于:

同步队列

三、同步器提供的模板方法

3.1 独占式同步状态获取与释放

3.1.1 获取

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感。当同步状态获取成功后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件,代表着当前线程获取了锁。其实现为:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

我们将其工作拆解称为三个部分:

Step 1 tryAcquire(arg)

调用 自定义同步器 实现的tryAcquire(int arg)方法,该方法需要保证线程安全的获取同步状态。

Step 2 addWaiter(Node.EXCLUSIVE), arg)

如果同步状态获取失败(tryAcquire方法返回false),则构造同步结点,并通过addWaiter(Node node)方法加入到同步队列的尾部。为了保证能够线程安全地添加,它采用了两层保护机制:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //1.确保节点能够线程安全地被添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //2.通过死循环来确保节点的正确添加,在"死循环"中只有通过`CAS`将节点设置为尾节点之后,当前线程才能从该方法返回,否则当前线程不断地进行尝试。
        enq(node);
        return node;
    }

Step 3 acquireQueued(Node node, int arg)

调用acquireQueued(Node node, int arg)方法,使得新构造的同步结点以“死循环”的方式获取同步状态。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //1.1 得到当前节点的前驱节点
                final Node p = node.predecessor();
                //1.2 如果当前节点的前驱节点是头节点,只有在这种情况下获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    //1.3 将当前节点设为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //2 前驱结点不是头结点,或者无法获取到同步状态的情况。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里需要注意的有以下几点:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
                pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程。
        LockSupport.park(this);
        //如果当前线程已经被中断,那么返回 true。
        return Thread.interrupted();
    }

3.1.2 释放

当前线程获取同步状态并执行了相应的逻辑后,就需要释放同步状态,并调用unparkSuccessor来唤醒后继结点。

  public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              unparkSuccessor(h);
          return true;
      }
      return false;
  }
  
  private void unparkSuccessor(Node node) {
      int ws = node.waitStatus;
      if (ws < 0)
          node.compareAndSetWaitStatus(ws, 0);
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node p = tail; p != node && p != null; p = p.prev)
              if (p.waitStatus <= 0)
                  s = p;
      }
      if (s != null)
          LockSupport.unpark(s.thread);
  }

3.1.3 小结

由以上两点分析,可以看出独占式获取 & 释放同步状态采用了模板方法的设计模式,AQS内部会基于当前线程创建一个结点,并负责管理线程的入队和出队,线程的阻塞和唤醒,我们只需要重写重要的模板方法:

下面是一个独占式获取 & 释放同步状态的简单实现示例:

/**
 * @author lizejun
 **/
public class ExclusiveLock implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException{
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }


    public static void run() {
        final ExclusiveLock lock = new ExclusiveLock();
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread() {

                @Override
                public void run() {
                    lock.lock();
                    System.out.println("begin:" + index);
                    try {
                        System.out.println("run:" + index);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("end:" + index);
                        lock.unlock();
                    }
                }

            }.start();
        }
    }
}

运行结果为如下,可以看到,处于lock ~ unlock之间的逻辑在同一时间只能由一个线程执行。

独占锁实现示例

假如我们注释掉lock.lock()lock.unlock(),那么打印的结果为,这时不能保证是线程安全的。

独占锁关闭

3.2 共享式获取 & 释放同步状态

3.2.1 获取

共享式获取与独占式获取最主要的区别在于 同一时刻能否有多个线程同时获取到同步状态

调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。锁的实现者需要重写tryAcquireShared方法,如果该方法的返回值大于0,表示能够获取到同步状态。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //1.如果前驱结点是头结点,那么会尝试获取同步状态。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //2.获取同步状态成功,设为头结点。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //3.获取同步状态失败,那么阻塞当前线程。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3.2.2 释放

通过调用releaseShared(int args)方法可以释放同步状态,该方法在释放同步状态后,将会唤醒后续处于阻塞状态的结点。对于支持多个线程同时访问的并发组件,它和独占式的区别在于,tryReleaseShared(int arg)方法必须保证同步状态安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

3.2.3 小结

与独占式获取 & 释放同步状态类似,在继承AQS时主要关注两个重要的模板方法:

下面是一个简单地共享式获取 & 释放同步状态的示例:

/**
 * @author lizejun
 **/
public class TwinsLock implements Lock {

    private Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    public static void run() {
        final TwinsLock lock = new TwinsLock();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread() {

                @Override
                public void run() {
                    lock.lock();
                    System.out.println("begin:" + index);
                    try {
                        System.out.println("run:" + index);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("end:" + index);
                        lock.unlock();
                    }
                }

            }.start();
        }
    }
}

运行结果为如下,可以看到在同一时刻只有两个线程能够获取到锁:

共享锁实现示例

3.3 独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)可以超时获取同步状态。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1.计算出截止时间.
        final long deadline = System.nanoTime() + nanosTimeout;
       //2.加入节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //3.取出前驱节点
                final Node p = node.predecessor();
                //4.如果获取成功则直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //5.如果到了超时时间,则直接返回
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //6.如果在自旋过程中被中断,那么抛出异常返回
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (faisled)
                cancelAcquire(node);
        }
    }

四、阻塞和唤醒线程

在前面,我们多次提到了阻塞和唤醒线程,它的实现依靠的是LockSupport工具类,它定义了一组的公共静态方法,提供了最基本的线程阻塞和唤醒功能:

五、Condition

ConditionObject是同步器AQS的内部类,每个Condition对象都包含着一个等待队列,该队列是实现等待/通知功能的关键,其用法也很简单,就是通过awaitsignal/signalAll方法来进行等待和通知。

5.1 等待队列

每一个Condition包含了一个等待队列,该队列结点的类型为AbstractQueuedSynchronizer.Node,与AQS中同步队列结点的类型相同,头结点firstWaiter和尾结点lastWaiter

等待队列

这里需要注意的是 一个AQS对象只有一个同步队列,但是它可以关联到多个同步队列上

5.2 等待

调用Conditionawait()方法的前提是当前线程已经获取了同步状态。它会执行下面的操作:

    public final void await() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        //1.创建新的结点加入等待队列。
        Node node = addConditionWaiter();
        //2.释放同步状态。
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        //3.判断是否在同步队列当中,如果不再则阻塞。
        while (!isOnSyncQueue(node)) {
            //3.1.等待被唤醒,唤醒的方法就是调用 signal 方法。
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //4.被唤醒后从等待队列移到同步队列中,继续参与同步状态的竞争。
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

5.3 通知

调用Conditionsignal()方法的前提条件是当前线程已经获取了同步状态,它会执行下面的操作:

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        //1.加入到同步队列中。    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            //2.唤醒。
            LockSupport.unpark(node.thread);
        return true;
    }

signalAll相当于对等待队列中的每个结点均执行一次signal方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

5.4 生产者消费者模型

生产者消费者模型是多线程协作的经典示例。在这个模型中包含三个角色。

下面是实现的代码:

/**
 * @author lizejun
 **/
public class Demo {

    private static final int MAX_SIZE = 5;

    private final LinkedList<Integer> factory = new LinkedList<>();

    private Lock lock = new ReentrantLock();
    private Condition consumer = lock.newCondition();
    private Condition producer = lock.newCondition();

    private void producer2() {
        lock.lock();
        try {
            while (factory.size() == MAX_SIZE) {
                try {
                    System.out.println("-- factory is full --");
                    producer.await();
                    Thread.sleep(Math.round(Math.random() * 100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("produce product");
            factory.add(factory.size());
            consumer.signalAll();
        } finally {
            lock.unlock();
        }
    }

    private void consumer2() {
        lock.lock();
        try {
            while (factory.size() == 0) {
                try {
                    System.out.println("-- factory is empty --");
                    consumer.await();
                    Thread.sleep(Math.round(Math.random() * 100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("consume product");
            factory.removeLast();
            producer.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void run2() {
        final Demo demo = new Demo();
        Thread pThread = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    demo.producer2();
                }
            }

        };
        Thread cThread = new Thread() {

            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    demo.consumer2();
                }
            }
        };
        pThread.start();
        cThread.start();
    }
}

运行结果为:

生产者消费者模型

5.5 Condition 提供的其它方法

上一篇 下一篇

猜你喜欢

热点阅读