Java并发编程-JDK并发包
参考资料:《Java高并发程序设计》
1.同步控制
1.扩展了synchronized功能的:重入锁
1.简介
- 使用示例:
import java.util.concurrent.locks.ReentrantLock;
public class Test {
// 声明锁
private static ReentrantLock lock = new ReentrantLock();
private static int i = 0;
private static final Runnable runnable = () -> {
for (int j = 0; j < 100000; j++) {
// 加锁
lock.lock();
try {
i++;
} finally {
// 释放锁
lock.unlock();
}
}
};
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
- 重入锁可以完全替代synchronized关键字。
- 与synchronized相比,重入锁有着明显的操作过程。开发人员必须手动指定何时加锁,何时释放锁。也正因为如此,重入锁对逻辑控制的
灵活性
要远好于synchronized。 - 需要注意的是,在退出临界区时,
必须记得释放锁
,否则,其他线程就没有机会再访问临界区了。 - 重入锁之所以有重入两字,是因为
这种锁是可以被同一个线程反复进入的
。示例中的核心代码可写成下面的形式:
lock.lock();
lock.lock();
try {
i++;
} finally {
lock.unlock();
lock.unlock();
}
- 可重入的目的是为了避免同一个线程在第2次获取锁的时候和自己产生死锁。
- 需要注意的是:如果一个线程多次获得锁,那么在释放锁的时候,也必须释放相同的次数。
- 如果释放锁的次数多,那么会得到一个IllegalMonitorStateException异常。
- 如果加锁的次数多,那么相当于线程还持有这个锁,其他线程仍无法进入临界区。
2.高级功能
1.中断
- 对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况:
- 获得这把锁继续执行
- 保持等待
- 而重入锁提供了另外一种可能:线程可以被
中断
。即在等待锁的过程中,程序可以根据需要取消对锁的请求。 - 下面的代码产生了一个死锁,但得益于锁中断,可以轻易地解决这个死锁:
import java.util.concurrent.locks.ReentrantLock;
import lombok.AllArgsConstructor;
public class Test {
private static final ReentrantLock lock1 = new ReentrantLock();
private static final ReentrantLock lock2 = new ReentrantLock();
@AllArgsConstructor
private static class IntLock implements Runnable {
private int lock;
@Override
public void run() {
try {
if (lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("interrupted in sleep");
}
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("interrupted in sleep");
}
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
System.out.println("interrupted in business");
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(Thread.currentThread().getId() + ":线程退出");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new IntLock(1));
Thread t2 = new Thread(new IntLock(2));
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();
}
}
// interrupted in business
// 12:线程退出
// 11:线程退出
- 上述代码中对锁的请求,统一使用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应线程中断。
2.锁申请等待限时
- 可以使用tryLock()方法进行一次限时的锁申请:
public class Test {
public static class TimeLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
Thread.sleep(6000);
} else {
System.out.println("get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
TimeLock lock = new TimeLock();
Thread t1 = new Thread(lock);
Thread t2 = new Thread(lock);
t1.start();
t2.start();
}
}
// get lock failed
- 上述代码中的tryLock()方法接收两个参数,一个表示等待时长,另一个表示计时单位。如果申请锁成功,则返回true,否则返回false。
- tryLock()方法也可以不带参数直接运行。这种情况下,如果锁未被其他线程占用,则申请锁成功,返回true,否则不会做任何等待,立即返回false。下面演示了这种用法:
public class Test {
public static class TryLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public TryLock(int lock) {
this.lock = lock;
}
public void tryDoubleLock(ReentrantLock firstLock,
ReentrantLock secondLock,
String param, Consumer<String> consumer) {
while (true) {
if (firstLock.tryLock()) {
try {
if (secondLock.tryLock()) {
try {
consumer.accept(param);
return;
} finally {
secondLock.unlock();
}
}
} finally {
firstLock.unlock();
}
}
}
}
@Override
public void run() {
String param = ":My Job done";
Consumer<String> consumer = str ->
System.out.println(Thread.currentThread().getId() + str);
if (lock == 1) {
tryDoubleLock(lock1, lock2, param, consumer);
} else {
tryDoubleLock(lock2, lock1, param, consumer);
}
}
}
public static void main(String[] args) {
Thread t1 = new Thread(new TryLock(1));
Thread t2 = new Thread(new TryLock(2));
t1.start();
t2.start();
}
}
// 11:My Job done
// 10:My Job done
- 先让t1获得lock1,再让t2获得lock2,接着让t1申请lock2,t2申请lock1。在一般情况下,这会导致t1和t2相互等待,从而引起死锁。但因为使用了tryLock(),线程不会傻傻等待,而是不断重试,直到某个线程同时获得lock1和lock2两把锁。执行上述代码,可以发现两个线程可以很快双双正常执行完毕。
3.公平锁
- 大多数情况下,锁的申请都是非公平的。举个例子:线程t1首先请求了锁A,接着线程t2也请求了锁A,那么锁A可用时,是t1获得锁还是t2获得锁是不确定的。系统会从这个锁的等待队列中随机挑选一个,因此不能保证公平性,有可能会产生饥饿。
- synchronized关键字进行锁控制所产生的锁就是非公平的。
- 重入锁允许通过构造函数对锁的公平性进行设置。fair为true表示公平。
public ReentrantLock(boolean fair)
- 公平锁的实现需要维护一个有序队列,因此锁的实现成本高,性能也非常底下。因此默认情况下锁是非公平的。如果不是特殊的需求,也不需要公平锁。下面的代码演示了公平锁的使用:
public class Test {
public static class FairLock implements Runnable {
public static ReentrantLock fairLock = new ReentrantLock(true);
@Override
public void run() {
while (true) {
try {
fairLock.lock();
System.out.println(Thread.currentThread().getName() + "获得锁");
} finally {
fairLock.unlock();
}
}
}
}
public static void main(String[] args) {
FairLock r1 = new FairLock();
Thread t1 = new Thread(r1, "Thread_t1");
Thread t2 = new Thread(r1, "Thread_t2");
t1.start();
t2.start();
}
}
// 程序运行一段时间(稳定后)的部分输出:
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁
// Thread_t1获得锁
// Thread_t2获得锁
- 而如果把fairLock初始化时的参数改为false,那么根据系统的调度,一个线程会倾向于再次获取已经持有的锁,这种分配方式是高效的,但无公平性可言。
3.总结
- 对ReentrantLock的几个重要方法整理如下:
- lock(): 获得锁,如果锁已经被占用,则等待。
- lockInterruptibly(): 获得锁,但优先响应中断。
- tryLock(): 尝试获得锁,如果成功返回true,失败返回false。该方法不等待,立即返回。
- tryLock(long time, TimeUnit unit): 在给定的时间内尝试获得锁。
- unlock(): 释放锁。
- 从重入锁的实现来看,主要包含三个要素:
- 原子状态。原子状态使用CAS操作(CompareAndSwap)来存储当前锁的状态,判断锁是否已经被别的线程持有。
- 等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。
- 阻塞原语park()和unpark()。这两个方法用来挂起和恢复线程。没有得到锁的线程将会被挂起。有关park()和unpark()可参考线程阻塞工具类LockSupport。
2.重入锁的好搭档:Condition条件
1.简介
- Condition的作用和Object.wait()、Object.notify()的作用是大致相同的。但是Condition和重入锁相关联,而后者是和synchronized关键字合作使用的。
- 通过Lock接口(重入锁就实现了该接口)的newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition实例就可以让线程在合适的时间等待,或在特定的时刻得到通知继续执行。
2.方法
-
await()
调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。调用该方法外,当前线程会释放当前已经获得的锁(这一点与Object.wait方法一致),并且等待其它线程调用该条件对象的signal()或者signalAll()方法(这一点与Object.notify()或Object.notifyAll()很像)。或者在等待期间,当前线程被中断,则wait()方法会抛出InterruptedException并清除当前线程的中断状态。 -
await(long time, TimeUnit unit)
适用条件和行为与await()基本一致,唯一不同点在于,指定时间之内没有收到signal()或signalALL()信号或者线程中断时该方法会返回false;其它情况返回true。 -
awaitNanos(long nanosTimeout)
调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的中断状态;若指定时间内未收到通知,则返回0或负数。 -
awaitUntil(Date deadline)
适用条件与行为与awaitNanos(long nanosTimeout)完全一样,唯一不同点在于它不是等待指定时间,而是等待由参数指定的某一时刻。 -
awaitUninterruptibly()
调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否则调用该方法时会抛出IllegalMonitorStateException。调用该方法后,结束等待的唯一方法是其它线程调用该条件对象的signal()或signalALL()方法。等待过程中如果当前线程被中断,该方法仍然会继续等待,同时保留该线程的中断状态。 -
signal()
用于唤醒一个等待中的线程。类似于Object.notify() -
signalAll()
会唤醒所有在等待中的线程。类似于Object.notifyAll() -
代码演示:
public class Test {
public static class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock(); // 申请并获得锁
condition.await(); // 释放锁
// 收到signal信号后重新申请锁、获得锁
System.out.println("thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
}
public static void main(String[] args) throws Exception {
new Thread(new ReenterLockCondition()).start();
Thread.sleep(2000);
ReenterLockCondition.lock.lock(); // 获得锁
ReenterLockCondition.condition.signal(); // signal通知
ReenterLockCondition.lock.unlock(); // 释放锁,谦让给被唤醒的线程
}
}
// thread is going on
3.允许多个线程同时访问:信号量(Semaphore)
1.简介
- 从广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。
- 信号量主要提供了以下构造函数。在构造信号量对象时,必须指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。
public Semaphore(int permits) // 参数为信号量的准入数
public Semaphore(int permits, boolean fair) // 第二个参数可以指定是否公平
2.方法
-
acquire()
尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。 -
acquireUninterruptibly()
和acquire()方法类似,但是不响应中断。 -
tryAcquire()
尝试获得一个许可,如果成功返回true,失败返回false,不会等待,立即返回。 -
release()
用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。 -
代码演示:
public class Test {
public static class SemaphoreDemo implements Runnable {
final Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
System.out.println(System.currentTimeMillis() + "-" +
Thread.currentThread().getId() + ":done!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemaphoreDemo demo = new SemaphoreDemo();
for (int i = 0; i < 20; i++) {
exec.submit(demo);
}
}
}
// 从输出可以观察出系统以5个线程一组为单位,依次进行输出
4.读写分离锁:ReadWriteLock
- 读写分离锁(简称“读写锁”)可以有效地帮助 减少锁竞争,以提升系统性能。
- 读写锁允许多个线程同时读,但考虑到数据完整性,写写操作和读写操作间依然是需要相互等待和持有锁的。读写锁的访问约束如下表:
- | 读 | 写 |
---|---|---|
读 | 非阻塞 | 阻塞 |
写 | 阻塞 | 阻塞 |
- 在系统中,如果读操作次数远大于写操作,那么读写锁的功效就会非常明显。例如下面的代码:
public class Test {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private static CountDownLatch latch = new CountDownLatch(20);
private int value;
public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock(); // 模拟读操作
Thread.sleep(1000); // 读操作的耗时越多,读写锁的优势就越明显
return value;
} finally {
latch.countDown();
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock(); // 模拟写操作
Thread.sleep(1000);
value = index;
} finally {
latch.countDown();
lock.unlock();
}
}
public static void main(String[] args) throws Exception {
final Test demo = new Test();
Runnable readRunnable = () -> {
try {
demo.handleRead(readLock);
//demo.handleRead(lock); // 使用重入锁而不是读写锁
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writeRunnable = () -> {
try {
demo.handleWrite(writeLock, new Random().nextInt());
//demo.handleRead(lock); // 使用重入锁而不是读写锁
} catch (InterruptedException e) {
e.printStackTrace();
}
};
long startTime = System.currentTimeMillis();
for (int i = 0; i <= 18; i++) {
new Thread(readRunnable).start();
}
for (int i = 19; i < 20; i++) {
new Thread(writeRunnable).start();
}
latch.await();
System.out.println("used time: " + (System.currentTimeMillis() - startTime));
}
}
// 使用读写锁
// used time: 2022
// 使用重入锁
// used time: 20009
- 通过运行结果可以看出,使用读写锁程序大约2秒多就能结束(写线程直接是实际串行的)。而使用重入锁,所有的读和写线程之间都必须相互等待,整个程序的执行时间将长达20余秒。
5.倒计数器:CountDownLatch
-
倒计数器通常用来控制线程等待,它可以让某一个线程等待直到倒计数结束,再执行。
-
上一节介绍读写锁统计用时时,就是使用的倒计数器。此外倒计数器的一种典型场景就是火箭发射。在火箭发射前,为确保万无一失,往往会进行各项设备仪器的检查。只有等所有的检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使点火线程等待所有检查线程全部完工后,再执行。示意图如下:
未命名文件(1).png-12.2kB
-
倒计时器的几个关键方法
// 构造函数接收一个整数作为参数,即当前这个倒计时器的计数个数
public CountDownLatch(int count)
// 计数减一
public void countDown()
// 使线程等待,直到倒计数完成
public void await()
6.循环栅栏:CyclicBarrier
- CyclicBarrier与CountDownLatch类似,也可实现线程间的 计数等待 ,但功能更加强大:这个计数器可以 反复使用。
- 循环栅栏的一个使用场景:比如司令下达命令,要求10个士兵一起去完成一项任务。这时,就会要求10个士兵先集合报道,全部报道完毕后(第一次计数完成),再一起去执行任务,当10个士兵把自己的任务都完成了(第二次计数完成),那么司令再对外宣布:任务完成!
- CyclicBarrier有两个主要的方法:
// parties:计数总数,也就是参与的线程总数
// barrierAction:当计数器完成一次计数后,系统要执行的动作
public CyclicBarrier(int parties, Runnable barrierAction)
// 等待一次计数完成
// 注意抛出的两种异常(见下一节的代码)
public int await() throws InterruptedException, BrokenBarrierException
- 上面的的场景用代码实现如下:
public class Test {
private static final int SOLDIER_NUM = 10;
private volatile static boolean allAssembledFlag = false;
private static final Runnable barrierRun = () -> {
if (allAssembledFlag) {
System.out.println("司令:[士兵" + SOLDIER_NUM + "个,任务完成!]");
} else {
System.out.println("司令:[士兵" + SOLDIER_NUM + "个,集合完毕!]");
}
};
public static class Soldier implements Runnable {
private final CyclicBarrier cyclic;
private String soldierName;
Soldier(CyclicBarrier cyclic, String soldierName) {
this.cyclic = cyclic;
this.soldierName = soldierName;
}
void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldierName + ":任务完成");
}
@Override
public void run() {
try {
// 等待所有士兵到齐
cyclic.await();
allAssembledFlag = true;
doWork();
// 等待所有士兵完成工作
cyclic.await();
} catch (InterruptedException e) {
System.out.println("线程等待时被中断——需要执行响应外部紧急事件的逻辑");
} catch (BrokenBarrierException e) {
System.out.println("当前的CyclicBarrier已经被损坏," +
"系统可能已经无法等待所有线程到齐——需要执行使等待线程就地解散的逻辑");
// 例如当前循环栅栏的计数总数为10,若有一个线程被中断,
// 那我们就会得到1个InterruptedException和9个BrokenBarrierException
}
}
}
public static void main(String[] args) throws Exception {
Thread[] allSoldier = new Thread[SOLDIER_NUM];
CyclicBarrier cyclic = new CyclicBarrier(SOLDIER_NUM, barrierRun);
// 设置屏障点,主要是为了执行这个方法
System.out.println("集合队伍!");
for (int i = 0; i < SOLDIER_NUM; ++i) {
System.out.println("士兵 " + i + " 报道!");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
allSoldier[i].start();
}
}
}
// 集合队伍!
// 士兵 0 报道!
// 士兵 1 报道!
// 士兵 2 报道!
// 士兵 3 报道!
// 士兵 4 报道!
// 士兵 5 报道!
// 士兵 6 报道!
// 士兵 7 报道!
// 士兵 8 报道!
// 士兵 9 报道!
// 司令:[士兵10个,集合完毕!]
// 士兵 3:任务完成
// 士兵 5:任务完成
// 士兵 9:任务完成
// 士兵 7:任务完成
// 士兵 4:任务完成
// 士兵 1:任务完成
// 士兵 8:任务完成
// 士兵 0:任务完成
// 士兵 6:任务完成
// 士兵 2:任务完成
// 司令:[士兵10个,任务完成!]
7.线程阻塞工具类:LockSupport
- LockSupport是一个线程阻塞工具,可以在线程内任意位置让线程阻塞。
- 和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。
- 和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
- LockSupport的 静态方法park() 可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法,它们实现了一个限时的等待。
- 之前提到的有关 suspend()永久卡死线程 的例子,可用LockSupport重写如下:
public class Test {
private static final Object u = new Object();
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
try {
System.out.println("in " + getName());
Thread.sleep(1000);
LockSupport.park();
System.out.println("out " + getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
ChangeObjectThread t1 = new ChangeObjectThread("t1");
ChangeObjectThread t2 = new ChangeObjectThread("t2");
t1.start();
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
// in t1
// out t1
// in t2
// out t2
-
这里只是将原来的suspend()和resume()方法用park()和unpark()方法做了替换。并且为了确保unpark()在park()之前被调用,还让线程多sleep了1秒。但执行这段代码可以发现,它永远可以正常的结束,不会被永久性的挂起。
-
LockSupport不会因unpark()先于park()执行而被永久性挂起的原因,是因为它使用了类似 信号量 的机制。它为每一个线程准备了一个 许可,如果许可 可用,那么park()函数会立即返回,并且 消费掉 这个许可(也就是将许可变为 不可用),如果许可不可用,就会阻塞。而unpark()则可以使一个许可变为可用(但和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。
-
此外,处于park()挂起状态的线程不会像suspend()那样给出一个令人费解的Runnable的状态。它会非常明确地给出一个WAITING状态,而且还会标注是park()引起的:
1.png-312.7kB
-
这种标注使得分析问题非常方便。此外,如果使用park(Object)函数,还可以为当前线程设置一个阻塞对象。这个阻塞对象会出现在线程Dump中,这样分析问题就更方便了。例如将上述代码中的park()改为park(this),那么在线程Dump中,可以看到如下信息:
1.png-379.1kB
-
除了有定时阻塞的功能外,LockSupport.park()还支持中断响应。但和其他接收中断的函数不同,park()函数不会抛出InterruptedException异常。park()函数只会默默的返回,但我们可以从Thread.interrupted()等方法获得中断标记。例如:
public class Test {
private static final Object u = new Object();
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
if (Thread.interrupted()) {
System.out.println(getName() + " 被中断了");
}
System.out.println("out " + getName());
}
}
}
public static void main(String[] args) throws Exception {
ChangeObjectThread t1 = new ChangeObjectThread("t1");
ChangeObjectThread t2 = new ChangeObjectThread("t2");
t1.start();
Thread.sleep(100);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
// in t1
// t1 被中断了
// out t1
// in t2
// out t2
end