Java并发
2.8 Java并发
2.8.1 线程安全性
当多个线程访问某个类时,这个类始终能表现出正确的行为,那么就称这个类是线程安全的。多线程中访问共享可变对象必须加以正确的同步手段。
2.8.2 对象共享和组合
对象共享
关于对象共享,需要明白Java内存模型:在Java中,所有实例域、静态域和数组元素存储在堆内存中,而堆内存在线程之间共享。关于对象共享一般可以如下方式进行:
- 在静态初始化函数中初始化对象引用,静态初始化函数由JVM在类的初始化阶段执行,而JVM内部存在同步机制,使用这种方式初始化对象是线程安全的。
- 将对象引用保存到final域中,final的语义是引用不可以换指,需要注意的是它不能保证对象本身不可变。
- 将对象引用保存到volatile域中或者AtomicReference中,或者一个由锁保护的域中(原子性和可见性)。
需要注意的是,要特别防止对象的逸出:不要在构造函数中逸出this引用,不要在构造函数中启动线程(可以在构造函数中创建线程)。
对象组合
当我们需要在多个线程之间共享对象时,则需要设计线程安全类:找出构成对象状态的所有变量,找出约束状态变量的不变性条件,建立对象状态的并发访问管理策略(Java同步容器类通过同步所有公共方法来保证线程安全性)。
需要特别注意的是,当我们在扩展线程安全类时,要特别注意的事:要和原有的线程安全类使用同一个锁。具体如下所示:
public class ListHelper<T> {
public List<T> mList = new ArrayList<T>;
...
public synchronized boolean putIfAbsent(T t) {
boolean absent = !mList.contains(t);
if(absent) {
mList.add(t);
}
return absent;
}
}
public class ListHelper<T> {
public List<T> mList = new ArrayList<T>;
...
public boolean putIfAbsent(T t) {
synchronized(mList) {
boolean absent = !mList.contains(t);
if(absent) {
mList.add(t);
}
return absent;
}
}
}
前者并不是线程安全的,后者才是正确的做法。
2.8.3 基础构建模块
同步容器类
同步容器实现线程安全性的方式是将状态封装起来,对每个公有方法进行同步。
这里我们简单介绍一下Iterator中的fail-fast机制:Iterator在遍历容器的过程,如果底层的容器发生了结构变化(add或者remove了元素),则会抛出ConcurrentModificationException。需要注意的是:当我们在for-each循环中删除或者添加元素时,会抛出该异常。这是因为Iterator是工作在一个独立的线程中,并且拥有一个mutex锁。Iterator被创建之后会建立一个指向原来对象的单链索引表,当原来的对象数量发生变化时,这个索引表的内容不会同步改变,所以当索引指针往后移动的时候就找不到要迭代的对象,所以按照fail-fast原则Iterator会马上抛出ConcurrentModificationException异常。所以Iterator在工作的时候是不允许被迭代的对象被改变的。但你可以使用Iterator的remove来删除对象,该方法会在删除当前迭代对象的同时维护索引的一致性。
并发容器
同步容器将所有对容器状态的访问都串行化,以实现线程安全性,但是这种方法严重降低并发性。而并发容器是针对多个线程并发访问设计的,增加了对一些常见复合操作的支持,如putIfAbsent,replace等。常用的并发容器有:CopyOnWriteArrayList和ConcurrentHashMap,前者主要是用于同步的List,如ArrayList和LinkedList;后者主要是用于替代同步且基于散列的Map,如HashMap。
队列
队列是一种非常有用的工具,一般有以下三种常用的队列:
- BlockingQueue与生产者-消费者模式(有界队列是很好的资源管理工具),一般常用于实现线程池等。
- SynchronousQueue:它不是一种真正的队列,它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着将元素加入或者移出队列。使用该种队列时,必须有足够的消费者,OKHttp就是使用该种队列实现的线程池,newCachedThreadPool也使用了该种队列。吞吐量要高于其他队列。
- Deque(双端队列)与工作密取,工作密取非常适用于既是消费者又是生产者的情况。
同步工具类
- 闭锁(CountDownLatch):可以延迟线程的执行直到闭锁到达终止状态。一般用于当条件充分之后才继续进行之后的场景:如确保某个计算所需要的资源都初始化后才继续执行,或者某个服务在其依赖的所有其他服务都启动了之后才启动,或者某个操作的所有参与者(如多玩家游戏中的所有玩家)都就绪后再继续执行。需要注意的是闭锁不可以重用。
- FutureTask:一般用作异步任务,可以使用get方法获取任务的结果(get是阻塞的)。
- 信号量:用来控制同时访问某个资源的操作数量,或者同时执行某个制定操作的数量。一般可以用于以下场景:二值信号量可以用作互斥锁(mutex),实现资源池(如数据库连接池),将任何一种容器变为有界容器。
- 栅栏:阻塞一组线程直到某个事件发生,其和闭锁之间的区别:所有线程必须同时到达栅栏位置才可以继续执行,而且栅栏可以重用。闭锁一般用于等待时间,而栅栏用于等待线程。一般常用的有两种栅栏:CyclicBarrier可以让一定数量的线程反复地在栅栏位置汇集,一般常用在并行迭代算法;而Exchanger是一种两方栅栏,各方在栅栏位置交换数据,一般常用在两方执行不对称的操作(例如读和写)。
2.8.4 任务执行、取消和关闭
大多数并发程序都是围绕“任务”来构造的,而理想情况下,任务之间是相互独立的,清晰的任务边界是程序并发的关键。大多数服务器程序使用一种自然的任务边界方式:以独立的客户请求为边界。
任务抽象
Runnable是最基本的任务抽象形式,Runnable的问题是无法返回结果或者抛出异常。
Callable是一种更好的抽象形式,它可以返回一个值,或者抛出异常。
Future表示一个任务的生命周期,并提供了相应的方法来获取任务的状态(完成或者取消),以及获取任务的结果或者取消任务(netty中的Future使用上更方便)。
任务执行策略
Executor框架(线程池)将任务提交和执行策略解耦,同时可以支持多种不同类型的执行策略。而一般任务执行策略需要考虑如下几个问题:
- 在什么线程中执行任务
- 任务按照什么顺序执行(FIFO、LIFO或者优先级)
- 由多少个任务可以并发执行
- 在队列中有多少个任务在等待执行
- 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个任务,以及如何通知上层应用有任务被拒绝
- 任务执行前后,应该进行哪些动作
Executor框架提供了四个工厂方法来创建一个线程池:newFixedThreadPool、newCachedThreadPool、newSingleThreadPool和newScheduledThreadPool。
任务的取消与关闭
任务的提交和启动很简单,但是任务的取消就困难的多。针对任务本身:Future是可以cancel的,而Runnable和Callable是无法cancel的。而针对任务执行的容器:这要分为在使用线程和线程池两种情况:
- 线程:Java没有提供任何机制来安全地终止线程,但它提供了中断,这是一种协作机制,能够使一个线程请求终止另一个线程的当前工作。
- 线程池:线程池需要提供生命周期方法来关闭自己以及其中的线程,也可以使用毒药对象的方式。
这里我们详述一下中断,Thread.interrupt方法并不是中断目标线程,而是通知目标线程应该中断了,而具体是中断还是继续运行,应该由目标线程自己处理。具体来说,对一个线程调用interrupt方法时:
- 如果线程处于被阻塞状态(例如处于sleep, wait, join等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常,仅此而已。
- 如果线程处于正常活动状态,那么会将该线程的中断标志设置为true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。
因此interrupt方法并不能真正的中断线程,需要被调用的线程自己进行配合才行。即一个线程如果有被中断的需求,那么就可以这样做:
- 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自动停止线程。
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
// do more work.
}
});
thread.start();
...
thread.interrupt();
- 在调用阻塞方法时正确处理InterruptedException异常:传递该异常或者恢复中断(当无法传递该异常时,在catch子句中调用Thread.currentThread().interrupt(),因为抛出该异常会清除中断标志位)。一定不要catch后什么都不做。
Thread thread = new Thread(() -> {
//do more work
try {
sleep();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
...
thread.interrupt();
非正常的线程终止
导致线程非正常终止的最主要原因是RuntimeException,一般不建议catch RuntimeException,但是守护线程可以catch RuntimeException。而JVM提供了一个钩子:UncaughtExceptionHandler可以用来获取线程终结的情况。
2.8.5 线程池的使用
线程池的理想大小主要取决于被提交任务的类型:一般计算密集型的任务时,线程池的理想大小是cpu+1;而IO密集型的更应该更大一些。
上文提到Executor框架提供了四种线程池的基本实现,Executor框架也支持各种定制方案:
- 线程的创建和销毁,线程池的基本大小(core size)、最大大小(max size)以及存活时间等因素共同影响线程的创建与销毁。
- 管理队列任务,
- 饱和策略,当使用有界队列时,有界队列填满时,饱和策略开始发挥作用。
- 线程工厂
ThreadPoolExecutor是可扩展的,它提供了几个方法可以进行扩展:beforeExecute、afterExecute和terminated。
2.8.6 原子变量和非阻塞同步机制
独占锁是一种悲观技术,它假设最坏的情况,并且只有在确保其他线程不会造成干扰的情况下才能继续执行。而乐观锁通过借助冲突检测机制来判断更新过程中是否存在来自其他线程的干扰,如果存在,则操作失败,并且可以重试。
乐观锁:比较并交换(CAS),原子的进行比较并设置,其具体含义是:如果V的值是A,则将V的值更新为B,否则不修改并返回V的值(Java中原子量就是这样工作的)。
原子量:相当于一种泛化的volatile变量,提供原子性和可见性的保证。