小白架构师之路

多线程篇四(手写一个Lock实现类以及AQS详解)

2018-06-24  本文已影响27人  Albert_Yu

前言
结合我之前写的文章,我们已经学会了如何使用jdk提供的Lock,也知道了一些原理,那么我们自己能不能写一个Lock实现呢?答案是肯定的!
我们分析jdk自己提供的Lock实现类不难发现,他们底层都是基于AbstractQueuedSynchronizer同步器实现的

正文
一、AQS基本使用方法
同步器的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态。AQS的设计是基于模板方法设计模式,使用者需要继承AQS并重写指定的方法,随后将同步器组合在自定义的不同组件的实现中,并且调用同步器提供的模板方法,而这些模板方法将调用使用者重写的方法
通过分析AQS类,我们发现我们能重写的方法也不多


AQS.png

1、重点关注下面几个可重写的方法
(1)、tryAcquire(int arg):获取独占锁
(2)、tryRelease(int arg):释放独占锁
(3)、tryAcquireShared(int arg):获取共享锁
(4)、tryReleaseShared(int arg):释放共享锁
(5)、isHeldExclusively():快速判断当前锁释放被线程独占
2、对同步状态进行更改,这时就需要使用同步器提供的3个final方法来进行操作
(1)、getState()获取同步状态
(2)、setState(int newState)设置同步状态
(3)、compareAndSetState(int expect,int update)原子的设置同步状态
3、自定义Lock实现类

package com.yubin.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定义Lock接口实现类
 */
public class MyLock implements Lock {

    private Sync sync;

    public MyLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        // 尝试获取独占锁
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 尝试是否独占锁
        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 快速判断当前锁释放已被独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }


    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试类

package com.yubin.concurrent;

import com.yubin.thread.basic.threadstate.SleepUtils;

import java.util.concurrent.locks.Lock;

/**
 * 测试自定义的Lock实现类
 *
 * @Author YUBIN
 * @create 2018-06-24
 */
public class TestMyLock {

    private final Lock lock = new MyLock();

    private volatile int count = 100;

    private static class WorkThread extends Thread {
        private TestMyLock myLock;

        public WorkThread(TestMyLock myLock) {
            this.myLock = myLock;
        }

        @Override
        public void run() {
            myLock.test();
        }
    }

    public void test() {
        lock.lock();
        try {
            SleepUtils.second(500);
            System.out.println(Thread.currentThread().getName() + "获取到的count=" + count--);
            SleepUtils.second(500);
            if (count == 98) {
                //test();
            }
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        TestMyLock myLock = new TestMyLock();
        // 启5个子线程
        for (int i = 0; i < 5; i++) {
            new WorkThread(myLock).start();
        }
        // 主线程没秒打印一行空格
        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1000);
            System.out.println();
        }
    }

}

运行上面测试类的main方法,count值在以我们预期的方法一样依次的减一

Thread-1获取到的count=100

Thread-0获取到的count=99

Thread-4获取到的count=98

Thread-2获取到的count=97

Thread-3获取到的count=96

假如此时我把test()方法中的注释掉的方法放开就会发现所有的子线程都处于waiting状态

"Thread-4" #15 prio=5 os_prio=0 tid=0x00000000586df800 nid=0x31c waiting on condition [0x0000000059b5f000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-3" #14 prio=5 os_prio=0 tid=0x00000000586de800 nid=0x1e14 waiting on condition [0x00000000590bf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-2" #13 prio=5 os_prio=0 tid=0x00000000586d8000 nid=0x304 waiting on condition [0x00000000599bf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-1" #12 prio=5 os_prio=0 tid=0x00000000586d7000 nid=0x1e0c waiting on condition [0x0000000058d2e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:38)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

"Thread-0" #11 prio=5 os_prio=0 tid=0x00000000586d5000 nid=0x196c waiting on condition [0x00000000597cf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000d962b078> (a com.yubin.concurrent.MyLock$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
    at com.yubin.concurrent.MyLock.lock(MyLock.java:57)
    at com.yubin.concurrent.TestMyLock.test(TestMyLock.java:33)
    at com.yubin.concurrent.TestMyLock$WorkThread.run(TestMyLock.java:28)

那么为什么导致所有的子线程都处于waiting状态呢?看我们自己定义的Lock我们很容易会发现原来我们自己定义的Lock实现类不支持锁的可重入,由锁的可重入概念我们知道锁的可重入是以线程为单位和状态变量来实现可重入锁的
接下来我们对自定义的Lock实现类进行改造
4、自定义Lock实现类优化支持锁的可重入

package com.yubin.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定义Lock接口实现类
 * @Author YUBIN
 * @create 2018-06-24
 */
public class MyLock implements Lock {

    private Sync sync;

    public MyLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        // 尝试获取独占锁
        @Override
        protected boolean tryAcquire(int arg) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 获取锁的状态
            int c = getState();
            if (c == 0) {
                // 如果锁的状态是0则以CAS的形式将状态变量设置为1
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                // 因为是独占锁,所以同一时刻只能有一个线程能获取到锁,如果当前的锁是被当前线程获取过了,则将状态变量+1
                int nextC = c + arg;
                // 设置新的状态变量
                setState(nextC);
                return true;
            }
            return false;
        }

        // 尝试是否独占锁
        @Override
        protected boolean tryRelease(int arg) {
            // 判断当前锁释放是当前线程锁独占的,如果判断不成立则抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            int c = getState() - arg;
            boolean free = false;
            if (c == 0){
                // 如果状态为0了则说明当前线程可以释放对锁的持有了
                setExclusiveOwnerThread(null);
                free = true;
            }
            setState(c);
            return free;
        }

        // 快速判断当前锁释放已被独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }


    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

在改造好的Lock实现类里面我只是对tryAcquire()方法和tryRelease()方法进行了改写。
在tryAcquire方法主要加了如果state不为0的时候,判断当前线程是否已经和锁绑定,已经绑定的话则将state+1同时返回true
在tryRelease方法中主要增加了释放锁的时候是对state变量逐次减一当减到0的时候才将锁与当前线程绑定的状态去除,释放锁。
再次运行测试类的main方法程序可以正常的运行了,是不是很简单,嘻嘻嘻。

二、AQS原理剖析
1、如果线程没有获取到锁,我们的线程是如何等待的呢?AQS又是怎么支持的呢?接下来我们还是回到lock()方法上。
在lock方法中调用了同步器的acquire()方法

public void lock() {
    sync.acquire(1);
}

同步器的acquire方法中的具体实现如下

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg):这个tryAcquire方法执行的是我们自己定义的尝试独占的获取锁方法,如果获取失败就会走acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
addWaiter(Node.EXCLUSIVE):将当前线程封装成Node对象挂在等待队列的尾部

private Node addWaiter(Node mode) {
    // 把当前线程打包成一个Node对象 node中thread为当前线程 mode为null表示当前节点为独占节点 独占锁
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 首先尝试快速的挂尾节点 (假设自己是唯一的,不存在冲突)
    Node pred = tail;
    if (pred != null) {
        // 把当前节点的前一个节点设置为当前尾节点
        node.prev = pred;
        // 以CAS的形式把当前节点设置为尾节点
        if (compareAndSetTail(pred, node)) {
            // 把之前的尾节点的下一个节点设置为当前节点
            pred.next = node;
            // 返回当前节点
            return node;
        }
    }
    // 如果尝试失败就会走全面的入队操作 自旋CAS的模式来挂尾节点
    enq(node);
    return node;
}

enq(node):全面的入队操作

private Node enq(final Node node) {
    // 自旋CAS的形式来挂尾节点
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果AQS同步器对象中的尾节点为空的话则初始化一个空的尾节点,初始化成功的话则将这个初始化尾节点设置为头节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将当前节点的前一个节点设置为尾节点
            node.prev = t;
            // 以CAS的形式将当前节点设置为尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                // 返回尾节点
                return t;
            }
        }
    }
}

上面的代码执行完则说明当前线程已经挂在尾节点上了,下图为AQS中的同步队列基本结构图


同步队列的基本结构.png

接下来就会执行acquireQueued(final Node node, int arg)方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋CAS
        for (;;) {
            // 获取当前节点的前一个节点
            final Node p = node.predecessor();
            // 如果当前节点的前一个节点 == AQS对象的头结点则再次尝试获取一下锁
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功了的话则将当前节点设置为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在上面这段代码中就使用到了LockSupport,LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒的功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义一组以park开头的方法用来阻塞当前线程,以及unPark(Thread thread)方法来唤醒一个被阻塞的线程

2、线程获取到锁并且执行完自己的程序又是怎么释放锁的呢?是否锁的过程又是怎样的呢?AQS又是怎么支持的呢?接下来我们回到unLock()方法上。
在unLock()方法中调用了release()方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

该方法先调用我们自定义的tryRelease()方法,成功的话则会去调用unparkSuccessor(h)方法,在这个方法中会获取同步队列中的头结点的next节点中对应的线程,然后通过LockSupport类中的静态方法unpark(Thread thread)方法将其唤醒。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

总结
根据上面的流程我们总结了如下几幅图
1、尝试获取独占锁


01.png

2、获取独占锁失败将当前线程挂在同步队列的尾部及唤醒之后的操作


1.png
3、释放锁的过程
2.png
3、节点的相关操作图
3.png
上一篇下一篇

猜你喜欢

热点阅读