已读一些收藏并发

java并发编程(十二)ReentrantLock深入浅出

2021-12-22  本文已影响0人  我犟不过你

一、ReentrantLock简介

1.1 特点

ReentrantLock具有如下的特点:

后面会重点讲解其特点的实现原理。

1.2 代码结构

其代码结构如下图:

image.png

有三个内部类,分别是:Sync、FairSync、NonfairSync。

Sync继承自AbstractQueuedSynchronizer。

AbstractQueuedSynchronizer当中有Node和ConditionObject两个内部类。

通过上图类的字面意思应该能大概知晓前面提到的特点都是在哪里实现的了。

二、原理解析

2.1 可重入

可重入是指同一个线程如果首次获得了这把锁,因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次尝试获得锁时,自己也会被锁挡住。

前面学习的synchronized也是可重入锁。

可重入使用实例如下:

public class Test {

    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        method1();
    }
    public static void method1() {
        lock.lock();
        try {
            System.out.println("method1");
            method2();
        } finally {
            lock.unlock();
            System.out.println("method1 unlock");
        }
    }
    public static void method2() {
        lock.lock();
        try {
            System.out.println("method2");
            method3();
        } finally {
            lock.unlock();
            System.out.println("method2 unlock");
        }
    }
    public static void method3() {
        lock.lock();
        try {
            System.out.println("method3");
        } finally {
            lock.unlock();
            System.out.println("method3 unlock");
        }
    }
}

结果:

method1
method2
method3
method3 unlock
method2 unlock
method1 unlock

需要注意的是,lock.unlock()一定要在finally块的第一行。

还是使用之前的代码,一步步跟踪:

获取锁

    public void lock() {
        sync.lock();
    }

Sync的lock方法有两个实现类,公平锁和非公平锁:

image.png

此处使用的是非公平锁,因为初始化ReetrantLock时,默认使用非公平锁NonfairLock:

    public ReentrantLock() {
        sync = new NonfairSync();
    }

继续源码跟踪,非公平锁当中的lock()方法:

        final void lock() {
            // 此处使用自旋锁,判断当前线程是否持有该锁,如果是0的话,则将值替换成1
            if (compareAndSetState(0, 1))
                // 上述比较成立,设置此线程独占该锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 比较不成立,尝试获取锁
                acquire(1);
        }

当首次执行上锁操作,一定走的的是上面的setExclusiveOwnerThread流程,当线程重入或其他线程尝试获取该锁,走下面的acquire(1):

    public final void acquire(int arg) {
        // 尝试获取锁,
        if (!tryAcquire(arg) &&
            // 使用短路逻辑运算符,当获取失败,就继续向下走,会将线程添加到等待队列当中
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 中断当前线程,其实是设置中断标记
            selfInterrupt();
    }

我们主要关注tryAcquire,看起如何实现锁重入的,忽略中间过程,直接查看如下代码:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取当前线程同步状态,state使用volatile修饰
            int c = getState();
            // 表示没有线程持有锁
            if (c == 0) {
                // 自旋加锁,与前面的过程相同
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程就是持有锁的线程
            else if (current == getExclusiveOwnerThread()) {
                // 对当前状态 加 1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置状态值
                setState(nextc);
                return true;
            }
            return false;
        }

关于unlock释放锁过程,直接放关键代码:

        protected final boolean tryRelease(int releases) {
            // 获取当前状态 减1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 锁重入可以多次,只有当状态减为0,才能释放锁。
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

关于锁重入的原理比较简单,就介绍到这里。

2.2 可中断

ReetrantLock处理提供一个常规的lock()方法之外,还提供了一个可中断的方法lockInterruptibly(),当时用此方法获取锁时,如果持有锁的线程发生中断,则该方法将抛出异常:

public class InterruptTest {

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            System.out.println("启动...");
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("等锁的过程中被打断");
                return;
            }
            try {
                System.out.println("获得了锁");
            } finally {
                lock.unlock();
            }
        }, "t1");
        lock.lock();
        System.out.println("获得了锁");
        t1.start();
        try {
            TimeUnit.SECONDS.sleep(1);
            t1.interrupt();
            System.out.println("执行打断");
        } finally {
            lock.unlock();
        }
    }
}

结果:

获得了锁
启动...
执行打断
等锁的过程中被打断
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at com.cloud.bssp.juc.reetrantlock.InterruptTest.lambda$main$0(InterruptTest.java:19)
    at java.lang.Thread.run(Thread.java:748)

如果使用的是lock()方法,即使线程发生中断,仍然可以获取到锁,且不会抛出任何异常。

2.3 可设置超时时间

ReetrantLock提供了两个获取锁并快速返回的方法,不会一直等待,无论成功失败都将立即返回:

tryLock()测试:

public class TryLockTest {

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            if (!lock.tryLock()) {
                System.out.println("获取锁失败");
                return;
            } else {
                try {
                    System.out.println("获取锁成功");
                } finally {
                    lock.unlock();
                }
            }

        });

        lock.lock();
        try {
            t1.start();
            TimeUnit.SECONDS.sleep(1);
        } finally {
            lock.unlock();
        }
    }
}

结果:

获取锁失败

定时tryLock()如下所示:

public class TryLockTimeTest {

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();

        Thread t1 = new Thread(() -> {
            try {
                if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                    System.out.println("等待一秒获取锁失败");
                    return;
                } else {
                    try {
                        System.out.println("等待一秒获取锁成功");
                    } finally {
                        lock.unlock();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        lock.lock();
        try {
            t1.start();
            System.out.println("等待两秒");
            TimeUnit.SECONDS.sleep(2);
        } finally {
            lock.unlock();
        }
    }
}

结果:

等待两秒
等待一秒获取锁失败

2.4 设置为公平锁

前面就提到过ReentrantLock 默认是不公平的。

之所以使用非公平是因为公平锁一般是没有必要的,而且会降低并发度。

使用如下的方式创建公平锁:

ReentrantLock lock = new ReentrantLock(true);

跟踪器构造器:

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

关注其公平锁实现:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            // 继承自AQS的方法,内部先调用tryAcquire获取锁,获取失败则添加下城到等待队列当中
            acquire(1);
        }

        /**
         * 公平锁版本的tryAcquire
         */
        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取锁的状态
            int c = getState();
            // 0表示锁没有被持有
            if (c == 0) {
                // 判断当前等待队列是否有节点在等待,没有才去竞争
                if (!hasQueuedPredecessors() &&
                    // 比较并替换状态
                    compareAndSetState(0, acquires)) {
                    // 设置当前线程为独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 此处表示锁重入
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

2.5 条件变量

ReentrantLock支持多个条件变量。

如何理解上面这句话?我们在前面学习synchronized时,介绍了其wait方法,当线程调用wait方法时,会从线程的持有者owner变成等待状态,会加入到Monitor的WaitSet当中,当有其他线程再次调用wait,仍然会添加进来。就好比一个公共的休息室一样。

而ReentrantLock的多个条件变量就好比成多个休息室。

ReentrantLock实现多个条件变量要使用到await()/signal()方法,以及conditionObject队列,后面我们慢慢讲解,首先看下其用法:

public class ConditionTest {

    static ReentrantLock lock = new ReentrantLock();

    static Condition Tom = lock.newCondition();

    static Condition Jerry = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {

        new Thread(() -> {

            try {
                lock.lock();
                Tom.await();
                System.out.println("吃到了鱼");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {

            try {
                lock.lock();
                Jerry.await();
                System.out.println("吃到了奶酪");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();

        TimeUnit.SECONDS.sleep(1);

        try {
            lock.lock();
            System.out.println("鱼来了");
            Tom.signal();
        } finally {
            lock.unlock();
        }

        TimeUnit.SECONDS.sleep(1);
        try {
            lock.lock();
            System.out.println("奶酪来了");
            Jerry.signal();
        } finally {
            lock.unlock();
        }
    }
}

结果:

鱼来了
吃到了鱼
奶酪来了
吃到了奶酪

如上所示,有几个重点:

下面我们重点关注下是如何实现的?只讲解重点方法

await方法:

        public final void await() throws InterruptedException {
            // 如果线程状态是中断,则抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            // 将当前线程加入条件等待队列
            Node node = addConditionWaiter();
            // 释放锁的占用
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 当节点不在同步等待队列时
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 获取等待队列的锁 并且不抛出中断异常
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                // 重新设置中断标记
                interruptMode = REINTERRUPT;
            // 清除取消的节点
            if (node.nextWaiter != null) 
                unlinkCancelledWaiters();
            // 如果中断模式不是0,则根据状态决定抛出异常,中断线程还是什么都不执行
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter方法:

        private Node addConditionWaiter() {
            // 当前conditionObject当中的最后一个等待者
            Node t = lastWaiter;
            // 如果最后一个等待者被取消,请清空(不是null,且状态不是等待)
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 将当前线程设置为等待状态的节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

signal方法:

        public final void signal() {
            // 判断线程是否持有了锁,没有则抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            //如果条件队列的第一等待者不是null,执行信号唤醒
            if (first != null)
                doSignal(first);
        }

doSignal方法

        private void doSignal(Node first) {
            do {
                如果第一个节点的下一个等待者是null
                if ( (firstWaiter = first.nextWaiter) == null)
                     //则条件队列的最后一个等待者设置为null
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal方法

    final boolean transferForSignal(Node node) {
        // 比较节点状态是否是condition,是则更新成0,否则返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * 添加节点到同步等待队列
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // 此处等待状态是0,比较并替换状态为SIGNAL
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 解除线程阻塞状态
            LockSupport.unpark(node.thread);
        return true;
    }

关于ReentrantLock就简单介绍这些了,其实应该先学习AQS的,不然可能不太理解源码。

上一篇下一篇

猜你喜欢

热点阅读