Java基础

并发编程(六)ReentrantlLock实现原理

2021-01-12  本文已影响0人  Timmy_zzh
AQS

1.ReentrantLock的加锁方法lock()

public class ReentrantLock implements Lock, Serializable {
    
    private final ReentrantLock.Sync sync;

    public ReentrantLock() {
        this.sync = new ReentrantLock.NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
    }

    public void lock() {
        this.sync.lock();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {

        abstract void lock();

        final boolean nonfairTryAcquire(int acquire) {}

        protected final boolean tryRelease(int release) {}

        protected final boolean isHeldExclusively() {}

        final ConditionObject newCondition() {}

        final Thread getOwner() {}

        final int getHoldCount() {}

        final boolean isLocked() {}

        private void readObject(ObjectInputStream var1) {}
    }
}
NonfairSync实现源码如下:
    static final class NonfairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        NonfairSync() {
        }

        final void lock() {
            //1.通过cas操作来修改state状态,表示争抢锁的操作
            if (this.compareAndSetState(0, 1)) {
                //2.修改状态成功,则设置当前获得锁状态的线程为当前执行线程
                this.setExclusiveOwnerThread(Thread.currentThread());
            } else {
                //3.修改状态失败,调用AQS的acquire方法进行后续处理
                this.acquire(1);
            }
        }

        protected final boolean tryAcquire(int var1) {
            return this.nonfairTryAcquire(var1);
        }
    }
AbstractQueuedSynchronizer的acquire方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    protected AbstractQueuedSynchronizer() {
    }

    protected final int getState() {
        return this.state;
    }

    protected final void setState(int var1) {
        this.state = var1;
    }

    protected final boolean compareAndSetState(int var1, int var2) {
        return unsafe.compareAndSwapInt(this, stateOffset, var1, var2);
    }
    
    public final void acquire(int var1) {
        if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
            selfInterrupt();
        }
    }
    
    protected boolean tryAcquire(int var1) {
        throw new UnsupportedOperationException();
    }
}
NonfairSync的tryAcquire方法
    static final class NonfairSync extends ReentrantLock.Sync {
        
        protected final boolean tryAcquire(int acquires) {
            return this.nonfairTryAcquire(acquires);
        }
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            //获取当前执行的线程
            Thread current = Thread.currentThread();
            //获得state的值
            int state = this.getState();
            // state为0,表示当前无锁状态
            if (state == 0) {
                //无锁状态下,通过cas操作将state的值设置为1,设置当前线程持有锁
                if (this.compareAndSetState(0, acquires)) {
                    this.setExclusiveOwnerThread(current);
                    return true;
                }
            } 
            // 如果state不为0,说明当前已有线程持有锁,如果当前持有锁的线程就是当前线程
            // 则增加锁的重入次数,将state加1并重新设置
            else if (current == this.getExclusiveOwnerThread()) {
                int newState = state + acquires;
                if (newState < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                this.setState(newState);
                return true;
            }
            return false;
        }
    }

ReentrantLock的lock方法调用流程图:

1.ReentrantLock的lock加锁流程图.png

2.AQS核心功能原理分析

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    
    private volatile int state;
    
    static final class Node {
        ...
    }
}
2.1.state 锁状态

state还有一个功能是实现锁的独占模式或者共享模式

2.2.Node双端队列节点

Node时一个先进先出的双端队列,并且是等待队列,当多线程争用资源被阻塞时会进入此队列。这个队列时AQS实现多线程同步的核心。

AQS中有两个Node指针,分别指向队列的head和tail,分别对应头节点和尾节点。

    static final class Node {
        // 该等待同步的节点处于共享模式
        static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
        // 该等待同步的节点处于独占模式
        static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
        
        // Node中的线程状态,和AQS中的state不同,值范围有1,0,-1,-2,-3五个之
        static final int CANCELLED = 1;
        static final int SIGNAL = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        // 前驱节点
        volatile AbstractQueuedSynchronizer.Node prev;
        // 后继节点
        volatile AbstractQueuedSynchronizer.Node next;
        // 等待锁的线程
        volatile Thread thread;
        // 用于标记当前当前节点是否是共享模式,在Node构造函数中赋值
        AbstractQueuedSynchronizer.Node nextWaiter;

        final boolean isShared() {
            return this.nextWaiter == SHARED;
        }

        final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
            AbstractQueuedSynchronizer.Node var1 = this.prev;
            if (var1 == null) {
                throw new NullPointerException();
            } else {
                return var1;
            }
        }

        Node() {
        }

        Node(Thread var1, AbstractQueuedSynchronizer.Node var2) {
            this.nextWaiter = var2;
            this.thread = var1;
        }

        Node(Thread var1, int var2) {
            this.waitStatus = var2;
            this.thread = var1;
        }
    }

3.当前线程获取锁失败后的流程分析

锁的意义就是使竞争到锁对象的线程执行同步代码,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock时如何实现让线程等待并唤醒的呢?

3.1.AQS的addWaiter方法

当前线程获取锁失败后,会被添加到一个等待队列的末端,源码如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    //将线程以Node的方式添加到队列中,添加在tail节点后面
    private AbstractQueuedSynchronizer.Node addWaiter(AQS.Node var1) {
        // 把当前线程封装到一个新的Node对象中
        AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), var1);
        AbstractQueuedSynchronizer.Node pred = this.tail;
        // 将node插入队列
        if (pred != null) {
            node.prev = pred;   //前继节点为tail
            if (this.compareAndSetTail(pred, node)) {   
                //cas替换当前尾部,成功则返回true并设置tail的next后继节点为新节点
                pred.next = node;
                return node;
            }
        }
        // 插入队列失败,进入enq自旋重试入队
        this.enq(node);
        return node;
    }

    //插入新节点到队列中,如果队列未初始化则先进行初始化,然后再插入
    private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
        while(true) {
            AbstractQueuedSynchronizer.Node t = this.tail;
            if (t == null) {
                // 如果队列从未被初始化,则初始化一个头节点head,并赋值给tail
                if (this.compareAndSetHead(new AbstractQueuedSynchronizer.Node())) {
                    this.tail = this.head;
                }
            } else {
                // 将新节点node插入到tail节点的后面
                node.prev = t;
                if (this.compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}
3.2.AQS的acquireQueued 方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            while(true) {
                // 获取新节点的前继节点
                AbstractQueuedSynchronizer.Node preNode = node.predecessor();
                
                // 检测当前节点的前驱节点是否是头节点head,这是获取锁的资格
                // 如果是头节点的话,则调用tryAcquire尝试获取锁
                // 获取锁成功,则将头节点设置为当前节点
                if (preNode == this.head && this.tryAcquire(arg)) {
                    this.setHead(node);
                    preNode.next = null;
                    failed = false;
                    boolean result = interrupted;
                    return result;
                }

                // 尝试获取锁失败,则根据前驱节点判断是否要阻塞
                // 如果阻塞过程中被中断,则设置interrupted标志位为true
                // shouldParkAfterFailedAcquire 方法在前驱节点状态不为SIGNAL的情况下都会循环重试获取锁
                if (shouldParkAfterFailedAcquire(preNode, node) && this.parkAndCheckInterrupt()) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed) {
                this.cancelAcquire(node);
            }
        }
    }
}
// 根据前驱节点中的waitStatus 来判断是否需要阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
    // 获取前驱节点的状态
    int ws = pred.waitStatus;
    // 如果前驱节点的状态是SIGNAL(-1),则返回true,表示需要将当前线程挂起
    if (ws == -1) {
        return true;
    } else {
        if (ws > 0) {
            // 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点
            // 当前线程之后会再次回到循环并尝试获取锁
            do {
                node.prev = pred = pred.prev;
            } while(pred.waitStatus > 0);
            pred.next = var1;
        } else {
            // 等待状态为0 或者 PROPAGATE(-3),设置前驱节点的等待状态为SIGNAL,
            // 并且之后会回到循环再次重试获取锁
            compareAndSetWaitStatus(pred, ws, -1);
        }
        return false;
    }
}
waitStatu值 描述
CANCELLED(1) 当前线程因为超时或者中断被取消。这时一个终结态,也就是状态到此为止
SIGNAL(-1) 当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程,这个状态一般都是后继线程来设置前驱节点的
CONDITION(-2) 当前线程在condition队列种
PROPAGATE(-3) 用于将唤醒后的线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点称为头节点之前,是不会跃迁为此状态的
0 表示无锁状态
3.3.线程挂起操作

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
public class LockSupport {
    public static void park(Object var0) {
        Thread var1 = Thread.currentThread();
        setBlocker(var1, var0);
        UNSAFE.park(false, 0L);
        setBlocker(var1, (Object)null);
    }
}
获取锁流程总结:
  1. AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取锁
  2. 如果获取锁失败,通过addWaiter方法将线程构造成Node节点插入到同步队列队尾
  3. 在acquireQueued方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport中的native方法来实现线程阻塞。

4.释放锁操作

public class ReentrantLock implements Lock, Serializable {
    public void unlock() {
        this.sync.release(1);
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {

        protected final boolean tryRelease(int arg) {
            int newState = this.getState() - arg;
            // 判断当前线程是否是持有锁的线程,不是则直接抛出异常
            if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            } else {
                boolean isRelease = false;
                // 锁状态为0,表示当前尝试释放锁成功,接着执行真正的锁释放操作流程
                if (newState == 0) {
                    isRelease = true;
                    this.setExclusiveOwnerThread((Thread)null);
                }
                this.setState(newState);
                return isRelease;
            }
        }
    }
}

public abstract class AbstractQueuedSynchronizer{ 
    public final boolean release(int arg) {
        if (this.tryRelease(arg)) {
            AbstractQueuedSynchronizer.Node h = this.head;
            if (h != null && h.waitStatus != 0) {
                this.unparkSuccessor(h);
            }
            return true;
        } else {
            return false;
        }
    }
}
public abstract class AbstractQueuedSynchronizer {
    
    private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
        // 获取node的等待状态,其实node为头节点
        int ws = node.waitStatus;
        if (ws < 0) {   // 1.将头节点的等待状态设置为0(无锁状态)
            compareAndSetWaitStatus(node, ws, 0);
        }

        // 2.获取头节点的下一个节点
        AbstractQueuedSynchronizer.Node nextNode = node.next;
        if (nextNode == null || nextNode.waitStatus > 0) {
            nextNode = null;
            // 如果下一个节点为null,或者下个节点的状态为cancel(1),则找到队列中不是cancel的节点
            // for循环从队尾开始遍历寻找,找到第一个waitStatus《=0的节点,并进行唤醒
            for(Node t = this.tail; t != null && t != node; t = t.prev) {
                if (t.waitStatus <= 0) {
                    nextNode = t;
                }
            }
        }

        // 3.如果下一个节点不为空,而且等待状态《=0,则调用LockSupport的unpark方法唤醒线程
        if (nextNode != null) {
            LockSupport.unpark(nextNode.thread);
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读