从ReentrantLock简单理解线程同步中的一些概念

2019-03-12  本文已影响0人  猫爸iYao

1,锁的内部实现AbstractQueuedSynchronizer(AQSync)

AQSync是实现同步锁的核心。它提供一个双向链表实现的FIFO等待线程队列,一个单链表用于Condition多条件同步,以及一个表示同步状态的int值的state。

public abstract class AbstractQueuedSynchronizer ... {
    //一个双向链表的等待队列和一个单链表的分组队列
    static final class Node {
        volatile Node prev;
        volatile Node next;
        //当前节点的等待线程
        volatile Thread thread;
        //当前节点的等待线程的状态
        volatile int waitStatus; 
        //condition队列,条件分组挂起和唤醒的关键   
        Node nextWaiter;
    }
    //队头引用
    private transient volatile Node head;
    //队尾引用
    private transient volatile Node tail;
    //同步状态,在ReentrantLock中表示当前线程重入次数
    private volatile int state;
    ...
}

同时,AQSync继承自AbstractOwnableSynchronizer(AOSync)。该类只有一个Thread类型的私有属性exclusiveOwnerThread,并提供相应的get/set方法。顾名思义,它是独占锁模式下锁的持有线程的引用。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

理解AQSync的等待队列是理解ReentrantLock的关键。另外,这篇文章对Condition如何工作讲的十分清晰,有兴趣的朋友可以看下。java并发编程之Condition

2,公平锁与非公平锁

ReentrantLock有两个构造器。其中带参构造器可以传入一个布尔型值用于决定锁策略。true使用公平锁,false使用非公平锁。

公平锁(FairSync):当前线程加锁时先检查等待队列是否为空,如果队列为空,则直接获锁;如果队列不为空,则到队尾等待。严格遵循FIFO的获锁顺序。

非公平锁(NonfairSync):当前线程加锁时直接尝试获取空闲的锁,获取不到才到队尾等待。不遵循任何特定的获锁顺序。

为了减少不必要的上下文切换,在加锁时,两种锁并非直接进入等待队列,而是都有一个尝试获锁的过程,只是获锁的策略不同。

公平锁加锁源码

   final void lock() {
           acquire(1);
   }

   public final void acquire(int arg) {
       //尝试获锁,获锁失败则进入等待队列。等待过程中线程中断将继续等待(不响应中断)。
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

   protected final boolean tryAcquire(int var1) {
       Thread var2 = Thread.currentThread();
       int var3 = this.getState();
       if (var3 == 0) {
           //公平锁策略:等待队列为空且锁空闲,直接获锁,减少不必要的入队出队开销。
           if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {
               this.setExclusiveOwnerThread(var2);
               return true;
           }
          //当前线程占用锁,允许重入,状态量增加。
       } else if (var2 == this.getExclusiveOwnerThread()) {
            int var4 = var3 + var1;
            if (var4 < 0) {
                throw new Error("Maximum lock count exceeded");
            }
            this.setState(var4);
            return true;
       }
       //尝试获锁失败
       return false;
   }    

非公平锁加锁源码

    final void lock() {
        //如果锁空闲,直接获锁,不关心等待队列,获锁失败转入正常获锁策略。
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected final boolean tryAcquire(int acquires) {   
        return nonfairTryAcquire(acquires);
    }

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //非公平锁策略:如果锁空闲,直接获锁,不关心等待队列,获锁失败转入正常FIFO获锁策略。
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
            //重入锁机制
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    

可见,当一个线程加锁时尝试直接获锁的策略不同,公平锁要求当前等待队列中没有等待线程(!hasQueuedPredecessors())且当前锁空闲(compareAndSetState(0, acquires))。而非公平锁则只判断当前锁是否空闲(compareAndSetState(0, 1))。

因此,当出现锁竞争时,除第一次加锁外,公平锁的每一个线程都有出入队和线程上下文切换的开销。这也是为什么说非公平锁性能更加优异的原因。

然而,公平锁是不是一定会遵循FIFO这一规则呢?并不是,我们也可突破这一“限制”。当我们调用tryLock()方法时,公平锁将与非公平锁一样采用非公平锁策略。这在某些场景十分有用。

tryLock()源码

public boolean tryLock() {
    //直接采用非公平锁获锁策略
    return sync.nonfairTryAcquire(1);
}

看起来似乎非公平锁策略更优异,为什么还要保留公平锁呢?因为非公平锁比公平锁更加不可控,容易造成线程饥饿。

3,自旋锁与阻塞锁

自旋锁和阻塞锁是一组锁竞争失败时线程尝试重新获锁的概念。它们描述的是当线程竞争锁失败后采用何种策略等待重新获锁的问题。

自旋锁是指锁竞争失败后,未获锁线程通过轮询锁状态,不停尝试获锁的策略。这种方式不会令该线程发生上下文切换,即不会丢失CPU时间进入休眠状态。自旋锁适用于抢占式CPU,且加锁代码运行时间小于上下文切换的时间。自旋锁使用不当容易造成死锁,例如,递归调用可能造成死锁。

阻塞锁则是指锁竞争失败后,未获锁线程丢失CPU时间被挂起,等待当前锁唤醒的策略。这种方式会导致该线程发生上下文切换,必须等待当前锁的唤醒。阻塞锁由于需要上下文切换,适用于加锁代码执行时间较长的情景。

从ReentrantLock的源码来看,它一种阻塞锁。

前面我们讲到公平锁和非公平锁的实现时有这样一段代码:

   final void lock() {
           acquire(1);
   }

   public final void acquire(int arg) {
       //尝试获锁,获锁失败则进入等待队列。等待过程中线程中断将继续等待(不响应中断)。
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

这段代码里面我们通过分析tryAcquire(int)方法了解了公平锁与非公平锁在实现上的区别。现在我们通过if条件语句的第二个方法acquireQueued(Node, int)了解ReentrantLock如何实现互斥锁的。

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                //检查当前线程是否处于等待队列的头节点,如果是,直接再次尝试获取锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                //直接获锁失败,检查是否需要暂停。如果需要暂停,则暂停并检查是否中断。如果线程中断,interrupted 置为true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

上面是acquireQueued(Node, int)的代码,乍一看好像是一个自旋锁的样子for (;;)。我们来看一看是不是这样。这个方法有一个循环体,循环体内部首先再次检查当前线程是否处于等待队列的头节点,如果是,直接再次尝试获取锁,如果获取失败,检查是否需要暂停(按照方法名,暂且这么理解)。如果需要暂停,则暂停并检查是否中断。如果线程中断,interrupted 置为true。

可以看到,parkAndCheckInterrupt()是获锁失败后锁策略的关键。他决定这个策略是自旋还是阻塞。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这个方法调用了LockSupport#park(Object),并检查当前线程状态。我们主要看LockSupport#park(Object)方法的逻辑。

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, 0L);
        setBlocker(t, null);
    }

这个方法首先调了setBlocker(Thread t, Object arg),并传入当前线程对象t和AQSync对象。我们看一下这个方法的实现。

    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        U.putObject(t, PARKBLOCKER, arg);
    }

使用了sun.misc.UnsafeAPI修改了t对象(当前线程)的PARKBLOCKER参数的值。

PARKBLOCKER = U.objectFieldOffset(Thread.class.getDeclaredField("parkBlocker"));

很好理解,通过反射获取了Thread类的parkBlocker参数的地址索引,sun.misc.Unsafe便可以修改当前线程对象t的值为LockSupport#park(Object blocker)传入的blocker

接下来,LockSupport#park(Object)调用了U.park(false, 0L);。最终调用当前线程对象的Thread#parkFor$(long nanos)方法,并传入0参数。

    public final void parkFor$(long nanos) {
        synchronized(lock) {
        switch (parkState) {
            case ParkState.PREEMPTIVELY_UNPARKED: {
                parkState = ParkState.UNPARKED;
                break;
            }
            case ParkState.UNPARKED: {
                long millis = nanos / NANOS_PER_MILLI;
                nanos %= NANOS_PER_MILLI;

                parkState = ParkState.PARKED;
                try {
                    lock.wait(millis, (int) nanos);
                } catch (InterruptedException ex) {
                    interrupt();
                } finally {
                    /*
                     * Note: If parkState manages to become
                     * PREEMPTIVELY_UNPARKED before hitting this
                     * code, it should left in that state.
                     */
                    if (parkState == ParkState.PARKED) {
                        parkState = ParkState.UNPARKED;
                    }
                }
                break;
            }
            default /*parked*/: {
                throw new AssertionError("Attempt to repark");
            }
        }
        }
    }

尽管当前线程获锁失败,但仍然是运行状态,因此我们主要看UNPARKED分支。这个分支的逻辑是当前线程调用Object#wait(long, int)进入挂起状态。

综上,线程在acquireQueued(Node, int)的循环体内会直接进入线程挂起状态,直到被唤醒才会继续进行第二次循环。所以说ReentrantLock是阻塞锁。

5,可重入锁与不可重入锁

可重入锁和不可重入锁是线程获锁成功后尝试再次获锁的一组概念。他们与自旋锁和互斥锁是两种不同的概念(自旋锁和阻塞锁是一组锁竞争失败时线程尝试重新获锁的概念)。

可重入锁,顾名思义,是一种使线程在已经持有锁且不释放锁的状态下再次持有该锁的策略。反应在代码上就是锁嵌套。

不可重入锁则与之相反,线程必须先释放锁,然后重新去竞争锁。自旋锁某种程度上也代表着是不可重入锁,因为它不能锁嵌套。

锁的可重入某种程度上减少了线程竞争锁的总次数。因此,可重入锁相对来说更加有优势。ReentrantLock是可重入锁,它通过一个加锁计数器记录重入次数。加锁多少次就必须相应释放锁多少次。它的实现逻辑主要在tryAcquire(int)方法中。

   protected final boolean tryAcquire(int var1) {
       Thread var2 = Thread.currentThread();
       int var3 = this.getState();
       if (var3 == 0) {
           //公平锁策略:等待队列为空且锁空闲,直接获锁,减少不必要的入队出队开销。
           if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {
               this.setExclusiveOwnerThread(var2);
               return true;
           }
          //当前线程占用锁,允许重入,状态量增加。
       } else if (var2 == this.getExclusiveOwnerThread()) {
            int var4 = var3 + var1;
            if (var4 < 0) {
                throw new Error("Maximum lock count exceeded");
            }
            this.setState(var4);
            return true;
       }
       //尝试获锁失败
       return false;
   }    

6,可中断锁与不可中断锁

可中断锁与不可中断锁是一组线程尝试挂起失败(由于中断)后是否继续获锁的策略。

可中断锁是指线程在尝试挂起失败后,响应并抛出中断异常的策略,这种策略将导致线程停止获锁。

不可中断锁是指线程在尝试挂起失败后,不抛出中断异常,而是继续尝试获锁的策略。

ReentrantLock同时提供了这两种策略。

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                //非中断锁,不会立刻响应中断,而是继续尝试获锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                //中断锁,直接抛出中断异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

可以看到,可中断锁会在产生中断后立即抛出中断异常。而不可中断锁会继续请求锁,直到获锁后才处理中断。

7,独占锁与共享锁

独占锁,顾名思义,同一时间只能有一个线程持有锁。

共享锁,与独占锁相反,同一时间可以有多个线程持有锁。

ReentrantLock是独占锁。java.util.concurrent中的共享锁的实现有ReentrantReadWriteLockSemaphore

上一篇下一篇

猜你喜欢

热点阅读