Java并发编程解析 | 基于JDK源码解析Java领域中并发锁

2022-09-22  本文已影响0人  朝槿木兮

苍穹之边,浩瀚之挚,眰恦之美; 悟心悟性,善始善终,惟善惟道! —— 朝槿《朝槿兮年说》


写在开头

在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。<br />主要原因是,对于多线程实现实现并发,一直以来,多线程都存在2个问题:

因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”<br />简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。

关健术语

<br />本文用到的一些关键词语以及常用术语,主要如下:


基本概述

在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。

在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:

针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。<br />虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。<br />为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。<br />在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:

从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。


一.AQS基础同步器基本理论

在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

<br />一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。<br />在Java领域中,JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:

我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:<br />

综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二. JDK显式锁统一概念模型

在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。

综合Java领域中的并发锁的各种实现与应用分析来看,一把锁或者一种锁,基本上都会包含以下几个方面:

综上所述,大致可以根据上述这些方向,我们便可以清楚🉐️知道Java领域中各种锁实现的基本理论时和实现思想。


四.ReentrantReadWriteLock(读写锁)的设计与实现

在Java领域中,ReentrantReadWriteLock(读写锁)是针对于Java多线程并发控制中引入一个共享锁定义读操作与独占锁定义读操作等场景共同组合构成一把锁来提高并发,主要是基于内置的AQS基础抽象队列同步器实现的一种并发控制工具类。

<br />通过ReentrantReadWriteLock类能获取读锁和写锁,它的读锁是可以多线程共享的共享锁,而它的写锁是排他锁,在被占时不允许其他线程再抢占操作。

1. 设计思想

<br />一般来说,在一些特殊的场景中,比如对于数据的读和写操作,为提高并发性能,总会引入共享锁和独享锁来共同组成一把锁,通常情况下,我们把这类锁成为读写锁(ReadWriteLock) 。<br />简单来说,就是主要考虑读和写操作,读操作不会修改数据,可以利用多个线程进行读操作,一般采用共享锁实现;而写操作会改变数据本身,只能允许一个线程进行操作,因此采用独享锁实现。<br />读写锁(ReadWriteLock) 最大的一个特点就是在内部维护一对锁,一把读锁(ReadLock) ,一把写锁(WriteLock) 。其中,对于线程持有的情况来说,简单可以总结为“读共享,写独占”。

1.1 读写锁的基本理论

虽然读写锁(ReadWriteLock) 之间是有关系的:同一时刻不允许读锁和写锁同时被抢占,二者之间是互斥的。<br />假设现在有N个线程,主要从T(1),T(2),...,一直到T(N)个线程,在读写锁的操作情况如下,其中:

从一定意义上讲,根据读写锁操作的情况的性质分析,获取读锁和写锁的条件可以大致总结为:

但是在某些情况下,可能存在某个线程已经获取并持有读锁,希望能够获取写锁,并且在已经释放读锁时,通常情况下我们称之为读写锁的升级。<br />当然,有升级就会有降级,与之对应的就是读写锁的降级,主要描述的是某个线程已经获取并持有写锁,希望能够获取读锁,并且已经释放写锁。<br />一般来说,对于读写锁的升级与降级,我们一般需要注意的以下两个问题,其中:

1.2 读写锁的实现思想

ReentrantReadWriteLock最早是在JDK1.5版本中提供的,从设计思想上来看,主要包括同步器的工作模式,读锁和写锁等3个核心要素。其中:

2. 基本实现

在ReentrantReadWriteLock类在JDK1.8版本中,对于ReentrantReadWriteLock的基本实现如下:

public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {

        private static final long serialVersionUID = -6992448646407690164L;

        /** ReentrantReadWriteLock锁-内部ReadLock类 */
        private final ReentrantReadWriteLock.ReadLock readerLock;

        /** ReentrantReadWriteLock锁-内部WriteLock类 */
        private final ReentrantReadWriteLock.WriteLock writerLock;

        /** ReentrantReadWriteLock锁-内部同步器 */
        final Sync sync; 

        /** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;

            /** ReentrantReadWriteLock锁-共用状态变量封装-begin*/

            /** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
            static final int SHARED_SHIFT   = 16;

            /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

            /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

            /** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

            /** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
            private transient ThreadLocalHoldCounter readHolds;

            /** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
            private transient HoldCounter cachedHoldCounter;

            /** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
            private transient Thread firstReader = null;

            /** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
            private transient int firstReaderHoldCount;

            /** ReentrantReadWriteLock锁-共用状态变量封装-end*/

            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); // ensures visibility of readHolds
            }

            /** ReentrantReadWriteLock锁-读锁标记*/
            abstract boolean readerShouldBlock();

            /** ReentrantReadWriteLock锁-读锁标记*/
            abstract boolean writerShouldBlock();

            //... 其他代码
        }



        /** ReentrantReadWriteLock锁-无参数构造(默认非公平模式) */
        public ReentrantReadWriteLock() {
            this(false);
        }

        /** ReentrantReadWriteLock锁-有参数构造(可选公平/非公平模式) */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }

        /** ReentrantReadWriteLock锁-获取写锁 */
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }      
        /** ReentrantReadWriteLock锁-获取读锁 */
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


        /** ReentrantReadWriteLock锁-实例化Unsafe支持 */
        private static final sun.misc.Unsafe UNSAFE;

        /** ReentrantReadWriteLock锁-线程偏移量 */
        private static final long TID_OFFSET;

        /** ReentrantReadWriteLock锁-获取线程变量 */
        static final long getThreadId(Thread thread) {
            return UNSAFE.getLongVolatile(thread, TID_OFFSET);
        }

        /** ReentrantReadWriteLock锁-反射机制实例化Unsafe */
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> tk = Thread.class;
                TID_OFFSET = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("tid"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

2.1 基于AQS同步器封装静态内部Sync抽象类

/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    /** ReentrantReadWriteLock锁-共用状态变量封装-begin */

    /** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
    static final int SHARED_SHIFT   = 16;

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    /** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
    private transient ThreadLocalHoldCounter readHolds;

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
    private transient HoldCounter cachedHoldCounter;

    /** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
    private transient Thread firstReader = null;

    /** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
    private transient int firstReaderHoldCount;

    /** ReentrantReadWriteLock锁-共用状态变量封装-end*/

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }


    /** ReentrantReadWriteLock锁-读锁标记*/
    abstract boolean readerShouldBlock();

    /** ReentrantReadWriteLock锁-读锁标记*/
    abstract boolean writerShouldBlock();

    /** ReentrantReadWriteLock锁-独占模式获取读锁*/
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

    /** ReentrantReadWriteLock锁-独占模式释放锁*/
    protected final boolean tryAcquire(int acquires) {

        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    /** ReentrantReadWriteLock锁-共享模式释放锁*/
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }

    /** ReentrantReadWriteLock锁-共享模式获取锁*/
    protected final int tryAcquireShared(int unused) {

        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

    /** ReentrantReadWriteLock锁-共享模式获取锁*/
    final int fullTryAcquireShared(Thread current) {

        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

    /** ReentrantReadWriteLock锁-判断是否独占模式*/
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    // Methods relayed to outer class

    /** ReentrantReadWriteLock锁-定义条件变量*/
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    /** ReentrantReadWriteLock锁-获取当前锁的持有者*/
    final Thread getOwner() {
        // Must read state before owner to ensure memory consistency
        return ((exclusiveCount(getState()) == 0) ?
                null :
                getExclusiveOwnerThread());
    }

    /** ReentrantReadWriteLock锁-获取读锁次数统计*/
    final int getReadLockCount() {
        return sharedCount(getState());
    }

    /** ReentrantReadWriteLock锁-判断是否是写锁*/
    final boolean isWriteLocked() {
        return exclusiveCount(getState()) != 0;
    }

    /** ReentrantReadWriteLock锁-获取写锁持有次数统计*/
    final int getWriteHoldCount() {
        return isHeldExclusively() ? exclusiveCount(getState()) : 0;
    }

    /** ReentrantReadWriteLock锁-获取读锁次持有数统计*/
    final int getReadHoldCount() {
        if (getReadLockCount() == 0)
            return 0;

        Thread current = Thread.currentThread();
        if (firstReader == current)
            return firstReaderHoldCount;

        HoldCounter rh = cachedHoldCounter;
        if (rh != null && rh.tid == getThreadId(current))
            return rh.count;

        int count = readHolds.get().count;
        if (count == 0) readHolds.remove();
        return count;
    }

    /** ReentrantReadWriteLock锁-获取读锁*/
    final boolean tryReadLock() {
        Thread current = Thread.currentThread();
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return false;
            int r = sharedCount(c);
            if (r == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return true;
            }
        }
    }

    /** ReentrantReadWriteLock锁-获取写锁*/
    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c != 0) {
            int w = exclusiveCount(c);
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
        if (!compareAndSetState(c, c + 1))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    /** ReentrantReadWriteLock锁-流处理*/
    private void readObject(java.io.ObjectInputStream s)
    throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        readHolds = new ThreadLocalHoldCounter();
        setState(0); // reset to unlocked state
    }

    /** ReentrantReadWriteLock锁-获取状态*/
    final int getCount() { return getState(); }

}

2.2 基于Sync抽象类封装共享状态变量

/** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    /** ReentrantReadWriteLock锁-共用状态变量封装-begin*/
    
    /** ReentrantReadWriteLock锁-共用状态变量封装-共享状态移动位数16 */
    static final int SHARED_SHIFT   = 16;

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的状态大小*/
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁每次加锁的最大次数*/
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    /** ReentrantReadWriteLock锁-共用状态变量封装-写锁的掩码*/
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    /** ReentrantReadWriteLock锁-共用状态变量封装-本地存储读锁次数*/
    private transient ThreadLocalHoldCounter readHolds;

    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
    private transient HoldCounter cachedHoldCounter;

    /** ReentrantReadWriteLock锁-共用状态变量封装-线程变量*/
    private transient Thread firstReader = null;

    /** ReentrantReadWriteLock锁-共用状态变量封装-首次读锁次数*/
    private transient int firstReaderHoldCount;

    /** ReentrantReadWriteLock锁-共用状态变量封装-end*/
    
    /** ReentrantReadWriteLock锁-共用状态变量封装-构造方法*/
    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }


    /** ReentrantReadWriteLock锁-共用状态变量封装-读锁的状态码值*/
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    /** ReentrantReadWriteLock锁-共用状态变量封装-写锁的状态码值*/
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    /** ReentrantReadWriteLock锁-共用状态变量-统计计数器 */
    static final class HoldCounter {
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }

    /** ReentrantReadWriteLock锁-共用状态变量-本地存储统计副本 */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    //... 其他代码

}

一般来说,AQS基础同步器的共享状态变量是整型的32位,要基于一个AQS基础同步器实现读写锁的共享一个共享变量。<br />其中,最公平的方式设计方式就是读锁与写锁各自占用16位,就意味着读锁占用的是高16位,写锁占用的是低16位的。

但是,在获取读写锁的状态值的时候,还会涉及一些额外的计算,这样的设计方式可能会需要用到位移和逻辑与操作等。

2.3 基于Sync抽象类封装FairSync公平同步器

/** ReentrantReadWriteLock锁-基于Sync抽象类封装FairSync公平同步器 */
static final class FairSync extends Sync {

    private static final long serialVersionUID = -2274990926593161451L;

    /** ReentrantReadWriteLock锁- 实现writerShouldBlock方法*/
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }

    /** ReentrantReadWriteLock锁- 实现readerShouldBlock方法*/
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

2.4 基于Sync抽象类封装NonfairSync非公平同步器


/** ReentrantReadWriteLock锁-基于Sync抽象类封装FairSync公平同步器 */
static final class NonfairSync extends Sync {
    
    private static final long serialVersionUID = -8159625535654395037L;
    
    /** ReentrantReadWriteLock锁- 实现writerShouldBlock方法*/
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }

    /** ReentrantReadWriteLock锁- 实现readerShouldBlock方法*/
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}


2.5 基于Lock接口实现ReadLock读锁内部类

/** ReentrantReadWriteLock锁-基于Lock接口实现ReadLock读锁内部类*/
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-同步器 */
    private final Sync sync;

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-内部构造方法*/
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-获取锁方法(默认共享模式)*/
    public void lock() {
        sync.acquireShared(1);
    }

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-获取锁方法(支持中断机制)*/
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-尝试获取锁(一般模式)*/
    public boolean tryLock() {
        return sync.tryReadLock();
    }


    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-尝试获取锁(支持超时机制)*/
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-释放锁*/
    public void unlock() {
        sync.releaseShared(1);
    }

    /** ReentrantReadWriteLock锁-ReadLock读锁内部类-条件变量*/
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }


    public String toString() {
        int r = sync.getReadLockCount();
        return super.toString() +
            "[Read locks = " + r + "]";
    }
}

2.6 基于Lock接口实现WriteLock写锁内部类

/** ReentrantReadWriteLock锁-基于Lock接口实现WriteLock写锁内部类*/
public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-同步器*/
    private final Sync sync;

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-内部构造方法*/
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(独占模式)*/
    public void lock() {
        sync.acquire(1);
    }


    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(可中断)*/
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(一般模式)*/
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-获取锁方法(支持超时机制)*/
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }


    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-释放锁*/
    public void unlock() {
        sync.release(1);
    }



    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-条件变量*/
    public Condition newCondition() {
        return sync.newCondition();
    }


    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ?
                                   "[Unlocked]" :
                                   "[Locked by thread " + o.getName() + "]");
    }

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-是否独占判断*/
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    /** ReentrantReadWriteLock锁-WriteLock写锁内部类-统计数量*/
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

3. 具体实现

public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {

        private static final long serialVersionUID = -6992448646407690164L;

        /** ReentrantReadWriteLock锁-内部ReadLock类 */
        private final ReentrantReadWriteLock.ReadLock readerLock;

        /** ReentrantReadWriteLock锁-内部WriteLock类 */
        private final ReentrantReadWriteLock.WriteLock writerLock;

        /** ReentrantReadWriteLock锁-内部同步器 */
        final Sync sync; 

        /** ReentrantReadWriteLock锁-实例化Unsafe支持 */
        private static final sun.misc.Unsafe UNSAFE;

        /** ReentrantReadWriteLock锁-线程偏移量 */
        private static final long TID_OFFSET;

        /** ReentrantReadWriteLock锁-基于AQS封装内部同步器 */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;

            static final int SHARED_SHIFT   = 16;

            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

            private transient ThreadLocalHoldCounter readHolds;

            private transient HoldCounter cachedHoldCounter;

            private transient Thread firstReader = null;
            private transient int firstReaderHoldCount;

            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); // ensures visibility of readHolds
            }
            abstract boolean readerShouldBlock();

            abstract boolean writerShouldBlock();

            //... 其他代码
        }



        /** ReentrantReadWriteLock锁-无参数构造(默认非公平模式) */
        public ReentrantReadWriteLock() {
            this(false);
        }

        /** ReentrantReadWriteLock锁-有参数构造(可选公平/非公平模式) */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }

        /** ReentrantReadWriteLock锁-获取写锁 */
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }      
        /** ReentrantReadWriteLock锁-获取读锁 */
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
        /** ReentrantReadWriteLock锁-获取线程变量 */
        public final boolean isFair() {
            return sync instanceof FairSync;
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        protected Thread getOwner() {
            return sync.getOwner();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public int getReadLockCount() {
            return sync.getReadLockCount();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public boolean isWriteLocked() {
            return sync.isWriteLocked();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public boolean isWriteLockedByCurrentThread() {
            return sync.isHeldExclusively();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public int getWriteHoldCount() {
            return sync.getWriteHoldCount();
        }
        /** ReentrantReadWriteLock锁-获取线程变量 */
        public int getReadHoldCount() {
            return sync.getReadHoldCount();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        protected Collection<Thread> getQueuedWriterThreads() {
            return sync.getExclusiveQueuedThreads();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        protected Collection<Thread> getQueuedReaderThreads() {
            return sync.getSharedQueuedThreads();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public final boolean hasQueuedThread(Thread thread) {
            return sync.isQueued(thread);
        }
        /** ReentrantReadWriteLock锁-获取线程变量 */
        public final int getQueueLength() {
            return sync.getQueueLength();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public boolean hasWaiters(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        protected Collection<Thread> getWaitingThreads(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        public int getWaitQueueLength(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
        }

        /** ReentrantReadWriteLock锁-获取线程变量 */
        static final long getThreadId(Thread thread) {
            return UNSAFE.getLongVolatile(thread, TID_OFFSET);
        }

        /** ReentrantReadWriteLock锁-反射机制实例化Unsafe */
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> tk = Thread.class;
                TID_OFFSET = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("tid"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

<br />综上所述,ReentrantReadWriteLock锁是基于AQS基础同步器的共享模式和独享模式共同孵化的产物,支持公平/非公平模式,其中的ReadLock和WriteLock是基于同一个AQS基础同步器来实现,维护了共用状态变量机制。

写在最后

通过对Java领域中,JDK内部提供的各种锁的实现来看,一直围绕的核心主要还是基于AQS基础同步器来实现的,但是AQS基础同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。

但是,实际上,Java对于锁的实现与运用远远不止这些,还有相位器(Phaser)和交换器(Exchanger),以及在Java JDK1.8版本之前并发容器ConcurrentHashMap中使用的分段锁(Segment)。

不论是何种实现和应用,在Java并发编程领域来讲,都是围绕线程安全问题的角度去考虑的,只是针对于各种各样的业务场景做的具体的实现。

一定意义上来讲,对线程加锁只是并发编程的实现方式之一,相对于实际应用来说,Java领域中的锁都只是一种单一应用的锁,只是给我们掌握Java并发编程提供一种思想没,三言两语也不可能详尽。

到此为止,这算是对于Java领域中并发锁的最终章,文中表述均为个人看法和个人理解,如有不到之处,忘请谅解也请给予批评指正。

最后,技术研究之路任重而道远,愿我们熬的每一个通宵,都撑得起我们想在这条路上走下去的勇气,未来仍然可期,与各位程序编程君共勉!

版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。

上一篇 下一篇

猜你喜欢

热点阅读