Java

ReentranLock源码浅析

2020-01-21  本文已影响0人  小金狗子

本文为作者个人理解难免有误,欢迎指正

ReentranLock 的基本构成


private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer{...}

static final class NonfairSync extends Sync{...}

static final class FairSync extends Sync{...}

常用的lock方法具体实现是在FairSync和NonfairSync。Nonfaire体现在lock时尝试直接获取锁,如果获取失败,则使用和公平锁一样通过AbstractQueuedSynchronizer的acquire去加锁。

加锁


final void lock() {

      if (compareAndSetState(0, 1))

        setExclusiveOwnerThread(Thread.currentThread());

      else

        acquire(1);

}

对于锁的获取实质上是改变AbstractQueuedSynchronizer中state属性的值。
改变的过程大致是将state属性在的java内存中的偏移映射为本地内存地址,再通过Atomic::cmpxchg

改值。这些都是native方法,在java层面上可以不关心。简单点讲,本例中我们只需要记住所谓的加锁就是将state值从0改为1。

AbstractQueuedSynchronizer是核心!!!!

到这里,我们需要先看下AbstractQueuedSynchronizer的基本结构。


    private transient volatile Node head;

    /**

    * Tail of the wait queue, lazily initialized.  Modified only via

    * method enq to add new wait node.

    */

    private transient volatile Node tail;

    /**

    * The synchronization state.

    */

    private volatile int state;



Node组成双向链表,在源码中的称呼为CLH队列。


        +------+  prev +-----+       +-----+
        | head | <---- |     | <---- | tail|  
        |      | ----> |     | ----> |     |
        +------+  next +-----+       +-----+

Node的结构


/** Marker to indicate a node is waiting in shared mode */

        static final Node SHARED = new Node();

        /** Marker to indicate a node is waiting in exclusive mode */

        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */

        static final int CANCELLED =  1;

        /** waitStatus value to indicate successor's thread needs unparking */

        static final int SIGNAL    = -1;

        /** waitStatus value to indicate thread is waiting on condition */

        static final int CONDITION = -2;

        /**

        * waitStatus value to indicate the next acquireShared should

        * unconditionally propagate

        */

        static final int PROPAGATE = -3;

        volatile int waitStatus;



        volatile Node prev;

        volatile Node next;

        volatile Thread thread;



        Node nextWaiter;

因为是个双向链表,所以自然有前驱和后继。thread表示尝试加锁的线程。waitStatus的值从-3到1,默认值0(下文用DEFAULT表示)

在本例中我们聚焦SIGNAL。它表示被标记为SIGNAL的节点的后继结点(的线程)需要被唤起!

acquire方法的实现


if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

首先看下tryAcquire方法.以FairSync为例,对于NonfairSync,对比代码发现就少了!hasQueuedPredecessors()这个逻辑


final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

先抛开hasQueuedPredecessors(),从代码来看
如果没获得锁,就做加锁操作,已经获得锁,就对当前state 加acquires(本例中为1,即+1)

为了略微全面一些,我们以公平锁为例.

没有人持有锁时,head和tail都为null,所以也不会有Queued Predecessors,tryAcquire结果为true,acquire直接返回,加锁成功结束。

相同线程执行lock,即锁重入时,对state做递增操作(+1)并state赋值。

如果是不同线程的话,就需要继续执行acquireQueued

先看addWaiter方法


private Node addWaiter(Node mode) {

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);

        return node;

    }



private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // Must initialize

                if (compareAndSetHead(new Node()))

                    tail = head;

            } else {

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

    }   

创建了一个Node,Node是加锁线程的代,我们取名N1。由于head和tail都为null,进入enq方法。首先创建一个Node(取名N0)初始化head和tail,初始化后如下图,


        +------+

        | head |

        | tail |

        +------+

继续循环,走入else后,N1是tail,N0是head


        +------+        +------+

        |  N0  | <----  |  N1  |

        | head | ---->  | tail |

        +------+        +------+

加入成功后就会执行


final boolean acquireQueued(final Node node, int arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

从上图的结构看,node的前驱就是head,但是tryAcquire是不成功的,因为已经被别的线程加锁了,所以走shouldParkAfterFailedAcquire()


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            /*

            * This node has already set status asking a release

            * to signal it, so it can safely park.

            */

            return true;

        if (ws > 0) {

            /*

            * Predecessor was cancelled. Skip over predecessors and

            * indicate retry.

            */

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            /*

            * waitStatus must be 0 or PROPAGATE.  Indicate that we

            * need a signal, but don't park yet.  Caller will need to

            * retry to make sure it cannot acquire before parking.

            */

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

刚开始,我们创建的Node和默认新增的都是没有设置waitStatus的,所以N0(pred)和N1(node)都默认值0。

经过这段代码,实质上更改waitStatus的值


        +------+        +------+

        |  N0  | <----  |  N1  |

        |SIGNAL| ---->  |DEFAULT|

        +------+        +------+

此时acquireQueued继续循环,shouldParkAfterFailedAcquire走入第一个if返回true,执行parkAndCheckInterrupt,线程就会park在LockSupport.park(this);


    private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);

        return Thread.interrupted();

    }

如果别的线程继续加锁


        +------+        +------+      +------+        +------+

        |  N0  | <----  |  N1  | <---- | Nn-1 | <----  |  Nn  |

        |SIGNAL| ---->  |SIGNAL| ----> |SIGNAL| ---->  |DEFAULT|

        +------+        +------+      +------+        +------+

锁释放


    public final boolean release(int arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

    }



    protected final boolean tryRelease(int releases) {

                int c = getState() - releases;

                if (Thread.currentThread() != getExclusiveOwnerThread())

                    throw new IllegalMonitorStateException();

                boolean free = false;

                if (c == 0) {

                    free = true;

                    setExclusiveOwnerThread(null);

                }

                setState(c);

                return free;

    }

首先对在tryRelease中清除当前持有锁的线程信息,更改state的值。


    private void unparkSuccessor(Node node) {

        /*

        * If status is negative (i.e., possibly needing signal) try

        * to clear in anticipation of signalling.  It is OK if this

        * fails or if status is changed by waiting thread.

        */

        int ws = node.waitStatus;

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

        /*

        * Thread to unpark is held in successor, which is normally

        * just the next node.  But if cancelled or apparently null,

        * traverse backwards from tail to find the actual

        * non-cancelled successor.

        */

        Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

    }

unparkSuccessor主要是将当前节点的值归0,然后唤醒下一个节点。


        +------+        +------+

        |  N0  | <----  |  N1  |

        |DEFAULT| ----> |DEFAULT|

        +------+        +------+

当下一节点的被唤醒时,原来part在LockSupport.park(this)上的线程就开始继续执行acquireQueued中的for循环了。重点看一下


final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

N1的前驱还是head,并且因为锁以释放,在公平锁的情况下,能尝试加锁成功。那么原本的N0节点就被移除了,N1就变成了HEAD,同时也是tail。Java

上一篇下一篇

猜你喜欢

热点阅读