图解多线程设计模式--读书笔记
1.接口和类
I:Excutor
I:ExcutorService
AC:AbstractExecutorService
C:ThreadPoolExecutor
I:ThreadFactory
C:DefaultThreadFactory
C:Executors
I:BlockingQueue
I:Future
2.Thread
1.创建线程的方式
继承Thread类
实现Runnable接口
2.Thread.sleep()
3.线程的互斥处理
1.关键字synchronized
2.互斥(mutual exclusion)
竞态条件(race condition)
线程的互斥机制成为监视(monitor)。另外获取锁有时候也称为“拥有(own)监视”或“持有(hold)锁”。
判断某一个线程是否拥有某一对象的锁:Thread.holdsLock(obj)
sychronized方法默认是this作为锁对象,而sychrosized静态方法使用该对象的类对象作为锁。
4.线程协作
等待队列:所有实例都拥有一个等待队列。它是在实例的wait方法执行后停止操作的线程的队列。
入口队列:针对未持有锁的。
当发生以下任意一种情况时,线程便会退出等待队列:
- 有其他线程的notify方法来唤醒线程
- 有其他线程的notifyAll方法来唤醒线程
- 有其他线程的interrupt方法来唤醒线程
- wait方法超时
wait方法
wait方法让线程进入等待队列。
obj.wait()
这叫做“线程正在obj实例上wait”
wait()->this.wait(); "线程正在this上wait"
若要执行wait方法,线程必须持有锁(这是规则)。但如果线程进入等待队列,便会释放其实例的锁。
等待队列其实是一个虚拟的概念。它既不是实例中的字段,也不用于获取正在实例上等待的线程的列表的方法。
notify方法
notify方法会将等待队列中的一个线程取出。
obj.notify()
那么obj等待队列中的一个线程会被取出,然后退出等待队列。
同wait方法一样,若要执行notify方法,线程必须也要持有要调用的实例的锁。
noftiyAll方法
notifyAll方法会将等待队列中所有的线程取出来。
obj.notifyAll()
同wait和notify一样,notifyAll也只能有持有调用实例的锁的线程调用。
线程状态迁移
Thread.Stat:NEW、RUNNABLE、TERMINATED、WAITING、TIMED_WAITING和BLOCKED
Single Threaded Execution
临界区:我们只允许单个线程执行的程序范围。
使用Single Threaded Execution情况:
- 多线程时
- Shared Resource被多线程访问时
- Shared Resource状态可能会发生变化
- 需要确保安全性时
在使用Single Threaded Execution会发生死锁。
在Single Threaded Execution模式下,满足下列条件就会发生死锁:
- 存在多个SharedResource角色(多个共享资源实例)
- 线程在持有某个SharedResource角色的锁时,还想获取其他Shared Resource资源的锁
- 获取Shared Resource角色的锁的顺序并不固定(Shared Resource的角色是对称的)
Single Threaded Execution会降低程序性能:
- 获取锁花费时间:如果shared resource数量减少,那么要获取的锁的数量就会减少,从而抑制性能下降
- 线程冲突引起的等待:尽量减少临界区的范围,从而减小冲突的概率,从而抑制性能的下降。
不容易发生线程冲突的ConcurrentHashMap
相关的模式
Guarded Suspension模式
Read-Write Lock模式
Immutable 模式
Thread-Specify Storage模式
原子操作
Jav编程规范定义了一些原子操作。例如,char、int等基本类型的赋值和引用都是原子操作。另外,引用类型的对象的赋值和引用也是原子操作。例外:long和double的赋值和引用不是原子操作。实际上,大部分java虚拟机也将long和double的操作实现了原子。
volatile关键字
通过java.util.concurrent.atomic包提供了便于原子操作的类,如AtomicInteger、AtomicLong、AtomicIntegerArray和AtomicLongArray等,这是通过volatile封装的类库。
计数信号量和Semaphore类
Single Threaded Execution模式用于确保某个区域“只能由一个线程执行”。而扩展下该模式,确保某个区域“最多由N个线程执行”。这个时候就要用计数信号量来控制线程数量。还有假设能给使用的资源税有N个,而需要使用这些资源的线程数大于N。这会导致资源竞争,因此需要交通管制。这种情况也需要计数信号量。
Semaphore semaphore = new Semaphore(2);
semaphore.acquire();//没有可用资源时,阻塞
try{
doSomethong();
}finally{
semaphore.release();//释放资源
}
Immutable模式
标准库中的immutable模式:java.lang.String、java.math.BigInteger、java.math.BigDecimal、java.util.regex.Pattern、java.lang.Integer等
Guarded Suspension模式
Queue LinkendList #peek() #remove() #offer()
BlockingQueue LinkedBlockingQueue #take()和#put()互斥
java.util.concurrent包中队列
java.util.concurrent包提供了BlockingQueue接口及其实现类,它们相当于Producer-Consumer模式中的Channel角色。
BlockingQueue接口-阻塞队列
继承Queue接口,拥有offer方法和poll方法等。实际上,实现阻塞功能的方法是BlockingQueue自身的put方法和take方法。
ArrayBlockingQueue-基于数组的BlockingQueue
表示元素个数有最大限制的BlockingQueue。
LinkedBlockingQueue-基于链表的BlockingQueue
表示元素没有最大限制(内存为满情况下)
PriorityBlockingQueue-带有优先级的BlockingQueue
数据的优先级依据Comparable接口的自然排序,或者构造函数出入的Comparator接口的顺序指定。
DelayQueue-一定时间后才可以take的BlockingQueue
DelayQueue用于存储Delayed对象的队列。当从该队列take时,只有各个元素指定的时间到期后才可以take。另外,到期时间最长的元素将被take。
SynchronousQueue-直接传递的BlockingQueue
- 如果Producer先put,在Consumer角色take之前,Producer角色的线程一直阻塞
- 如果Consumer先take,在Producer角色put之前,Consumer角色的线程一直阻塞
ConcurrentLinkedQueue-元素个数没有最大限制的线程安全队列
使用java.util.concurrent.Exchanger类交换缓冲区
Exchanger类用于两个线程安全地交换对象。
Balking模式
Producer-Consumer模式
一般来说,该模式会有多个生产者和多个消费者。当只有一个生产者和一个消费者时又称为Pipe模式。
生产者消费者模式在生产者和消费者之间加入了一个“桥梁角色”,用以消除线程间处理速度的差异。
生产者消费者模式中的角色
Data角色:由生产者负责生产,由消费者消耗。
Producer角色:生产者。生产者生产Data角色,并将其传递给Channel角色。
Consumer角色:消费者。从Channel角色获取Data角色并使用。
Channel角色:通道。Channel保存从Producer角色获取的Data角色,还会响应Consumer角色的请求,传递Data角色。为了安全起见,Channle角色会对Producer角色和Channel角色的访问执行互斥处理。
当Producer传递Data角色给Channel角色时,如果Channle角色的状态不适合接收Data角色,那么Producer角色会一直等待,直到Channel角色的状态适合接收Data角色。
当Consumer角色从Channel角色获取Data角色时,如果Channel角色没有可以提供的Data角色时,Consumer角色会一直等待,直到Channel角色状态可以提供Data角色。
加入了throws InterruptedException的方法
标准库中三个典型方法
- java.lang.Object的wait方法
- java.lang.Thread的sleep方法
- java.lang.Thread的join方法
三个方法和java.lang.Thread的interrupt方法使用。
notify、nofiyAll和interrupt方法的区别
notify/nofifyAll是java.lang.Object的方法,唤醒的是 该实例等待队列中的线程 ,而不是直接指定的线程。notify/notifyAll唤醒的线程会继续执行wait的下一条语句。另外,执行notify/notifyAll时,线程必须获取实例的锁。
interrupt方法是java.lang.Thread的实例方法,可以直接指定线程并唤醒。当被interrupt的线程处于sleep或wait时,会抛出InterruptedException异常。执行interrupt时,不要获取要取消线程的锁。
处于sleep、wait或join时,interrupt后异常抛出的时机:
wait和interrupt:当正在wait的线程被interrupt时(即线程被取消执行时),该线程在重新获取锁之后,抛出InterruptedException。在获取锁之前,不会抛出InterruptedException。
sleep、join和interrupt:当处于sleep的线程或被join的线程被interrupt时,会立马抛出InterruptedException异常。这个和wait状态下interrupt下是有区别的。
调用interrupt一定会抛出InterruptException异常?
不是滴。interrupt方法只是改变了线程的中断状态而已。将线程从非中断状态变为中断状态。
sleep、wait或join方法内部会有线程中断状态的检查。
而只有在执行到sleep、wait或join方法,或者有执行到编写的线程中断状态检查抛出InterruptionException时才会抛出该异常。
java.lang.Thread的实例方法interrupt和类方法interrupted
interrupt方法是将线程从非中断状态切换到中断状态。
Thread.interrupted方法是检查并清除中断状态。若当前线程处于中断状态,则返回true;若处于非中断状态,则返回false。然后将当前线程从中断切换到非中断。
不去使用stop方法
过时方法。stop方法可能会破坏安全性。因为,即使线程正在运行临界区的操作,Thread类也会立即终止该线程的操作。
interrupt方法为什么可以?
interrupt只是改变线程的中断状态。而只有线程执行sleep、wait和join方法时才会抛出InterruptedException异常。
Read-Write Lock模式
多个同学抄黑板,老师等同学抄完再擦黑板。
锁的含义
物理锁:synchrosized关键字定义的锁等
逻辑锁:ReadWriteLock实现的逻辑锁(Before/After模式:防止忘记释放锁)
java.util.concurrent.locks包
物理锁:Lock接口以及三个实现类:ReentrantLock、ReentrantReadWriteLock.ReadLock和ReentrantReadWriteLock.WriteLock。既重入锁、读锁和写锁。
/**
锁必须显示的创建、锁定和释放。
*/
Lock lock = new ReentrantLock();
lock.lock();
try{
}finally{
lock.unlock();
}
ReentrantLock比synchronized的三个特性:
- 等待可中断
- 可实现公平锁:new ReentrantLock(true)
- 锁可以绑定多个条件:ReentrantLock可以绑定多个Condition对象
两种锁的底层策略:
- synchronized:基于一种悲观的并发策略,线程获得是独占锁。独占锁意味着其他线程只能依靠阻塞来等待线程释放锁。而在 CPU 转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起 CPU 频繁的上下文切换导致效率很低。
- 随着指令集的发展,我们有了另一种选择:基于冲突检测的乐观并发策略,通俗地讲就是先进性操作,如果没有其他线程争用共享数据,那操作就成功了,如果共享数据被争用,产生了冲突,那就再进行其他的补偿措施(最常见的补偿措施就是不断地重拾,直到试成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步被称为非阻塞同步。ReetrantLock 采用的便是这种并发策略。
Java 5 中引入了注入 AutomicInteger、AutomicLong、AutomicReference 等特殊的原子性变量类,它们提供的如:compareAndSet()、incrementAndSet()和getAndIncrement()等方法都使用了 CAS 操作。因此,它们都是由硬件指令来保证的原子方法。
可中断锁
- 忽略中断锁:与synchronized实现的互斥锁一样,不能中断。
- 响应中断锁:可以响应中断。
如果某一线程 A 正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程 B 不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,如果此时 ReetrantLock 提供的是忽略中断锁,则它不会去理会该中断,而是让线程B继续等待,而如果此时 ReetrantLock 提供的是响应中断锁,那么它便会处理中断,让线程 B 放弃等待,转而去处理其他事情。
ReentrantLock lock = new ReentrantLock();
...........
lock.lockInterruptibly();//获取响应中断锁
try {
//更新对象的状态
//捕获异常,必要时恢复到原来的不变约束
//如果有return语句,放在这里
}finally{
lock.unlock(); //锁必须在finally块中释放
}
public class Buffer {
private Object lock;
public Buffer() {
lock = this;
}
public void write() {
synchronized (lock) {
long startTime = System.currentTimeMillis();
System.out.println("开始往这个buff写入数据…");
for (;;)// 模拟要处理很长时间
{
if (System.currentTimeMillis()
- startTime > Integer.MAX_VALUE) {
break;
}
}
System.out.println("终于写完了");
}
}
public void read() {
synchronized (lock) {
System.out.println("从这个buff读数据");
}
}
public static void main(String[] args) {
Buffer buff = new Buffer();
final Writer writer = new Writer(buff);
final Reader reader = new Reader(buff);
writer.start();
reader.start();
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (;;) {
//等5秒钟去中断读
if (System.currentTimeMillis()
- start > 5000) {
System.out.println("不等了,尝试中断");
reader.interrupt(); //尝试中断读线程
break;
}
}
}
}).start();
// 我们期待“读”这个线程能退出等待锁,可是事与愿违,一旦读这个线程发现自己得不到锁,
// 就一直开始等待了,就算它等死,也得不到锁,因为写线程要21亿秒才能完成 T_T ,即使我们中断它,
// 它都不来响应下,看来真的要等死了。这个时候,ReentrantLock给了一种机制让我们来响应中断,
// 让“读”能伸能屈,勇敢放弃对这个锁的等待。我们来改写Buffer这个类,就叫BufferInterruptibly吧,可中断缓存。
}
}
class Writer extends Thread {
private Buffer buff;
public Writer(Buffer buff) {
this.buff = buff;
}
@Override
public void run() {
buff.write();
}
}
class Reader extends Thread {
private Buffer buff;
public Reader(Buffer buff) {
this.buff = buff;
}
@Override
public void run() {
buff.read();//这里估计会一直阻塞
System.out.println("读结束");
}
}
import java.util.concurrent.locks.ReentrantLock;
public class BufferInterruptibly {
private ReentrantLock lock = new ReentrantLock();
public void write() {
lock.lock();
try {
long startTime = System.currentTimeMillis();
System.out.println("开始往这个buff写入数据…");
for (;;)// 模拟要处理很长时间
{
if (System.currentTimeMillis()
- startTime > Integer.MAX_VALUE) {
break;
}
}
System.out.println("终于写完了");
} finally {
lock.unlock();
}
}
public void read() throws InterruptedException {
lock.lockInterruptibly();// 注意这里,可以响应中断
try {
System.out.println("从这个buff读数据");
} finally {
lock.unlock();
}
}
public static void main(String args[]) {
BufferInterruptibly buff = new BufferInterruptibly();
final Writer2 writer = new Writer2(buff);
final Reader2 reader = new Reader2(buff);
writer.start();
reader.start();
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (;;) {
if (System.currentTimeMillis()
- start > 5000) {
System.out.println("不等了,尝试中断");
reader.interrupt(); //此处中断读操作
break;
}
}
}
}).start();
}
}
class Reader2 extends Thread {
private BufferInterruptibly buff;
public Reader2(BufferInterruptibly buff) {
this.buff = buff;
}
@Override
public void run() {
try {
buff.read();//可以收到中断的异常,从而有效退出
} catch (InterruptedException e) {
System.out.println("我不读了");
}
System.out.println("读结束");
}
}
class Writer2 extends Thread {
private BufferInterruptibly buff;
public Writer2(BufferInterruptibly buff) {
this.buff = buff;
}
@Override
public void run() {
buff.write();
}
}
条件变量实现线程间协作
在生产者——消费者模型一文中,我们用 synchronized 实现互斥,并配合使用 Object 对象的 wait()和 notify()或 notifyAll()方法来实现线程间协作。Java 5 之后,我们可以用 Reentrantlock 锁配合 Condition 对象上的 await()和 signal()或 signalAll()方法来实现线程间协作。在 ReentrantLock 对象上 newCondition()可以得到一个 Condition 对象,可以通过在 Condition 上调用 await()方法来挂起一个任务(线程),通过在 Condition 上调用 signal()来通知任务,从而唤醒一个任务,或者调用 signalAll()来唤醒所有在这个 Condition 上被其自身挂起的任务。另外,如果使用了公平锁,signalAll()的与 Condition 关联的所有任务将以 FIFO 队列的形式获取锁,如果没有使用公平锁,则获取锁的任务是随机的,这样我们便可以更好地控制处在 await 状态的任务获取锁的顺序。与 notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定唤醒与自身 Condition 对象绑定在一起的任务。
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
class Info{ // 定义信息类
private String name = "name";//定义name属性,为了与下面set的name属性区别开
private String content = "content" ;// 定义content属性,为了与下面set的content属性区别开
private boolean flag = true ; // 设置标志位,初始时先生产
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition(); //产生一个Condition对象
public void set(String name,String content){
lock.lock();
try{
while(!flag){
condition.await() ;
}
this.setName(name) ; // 设置名称
Thread.sleep(300) ;
this.setContent(content) ; // 设置内容
flag = false ; // 改变标志位,表示可以取走
condition.signal();
}catch(InterruptedException e){
e.printStackTrace() ;
}finally{
lock.unlock();
}
}
public void get(){
lock.lock();
try{
while(flag){
condition.await() ;
}
Thread.sleep(300) ;
System.out.println(this.getName() +
" --> " + this.getContent()) ;
flag = true ; // 改变标志位,表示可以生产
condition.signal();
}catch(InterruptedException e){
e.printStackTrace() ;
}finally{
lock.unlock();
}
}
public void setName(String name){
this.name = name ;
}
public void setContent(String content){
this.content = content ;
}
public String getName(){
return this.name ;
}
public String getContent(){
return this.content ;
}
}
class Producer implements Runnable{ // 通过Runnable实现多线程
private Info info = null ; // 保存Info引用
public Producer(Info info){
this.info = info ;
}
public void run(){
boolean flag = true ; // 定义标记位
for(int i=0;i<10;i++){
if(flag){
this.info.set("姓名--1","内容--1") ; // 设置名称
flag = false ;
}else{
this.info.set("姓名--2","内容--2") ; // 设置名称
flag = true ;
}
}
}
}
class Consumer implements Runnable{
private Info info = null ;
public Consumer(Info info){
this.info = info ;
}
public void run(){
for(int i=0;i<10;i++){
this.info.get() ;
}
}
}
public class ThreadCaseDemo{
public static void main(String args[]){
Info info = new Info(); // 实例化Info对象
Producer pro = new Producer(info) ; // 生产者
Consumer con = new Consumer(info) ; // 消费者
new Thread(pro).start() ;
//启动了生产者线程后,再启动消费者线程
try{
Thread.sleep(500) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
new Thread(con).start() ;
}
}
JAVA1.5 locks包中提供了实现Read-Write Lock模式的ReadWriteLock接口和ReentrantReadWriteLock类。
- readLock().lock()
- readLock().unlock()
- writeLock().lock()
- writeLock().unlock()
ReadWriteLock rwl = new ReentrantReadWriteLock();
rwl.writeLock().lock() //获取写锁
rwl.readLock().lock() //获取读锁
ReetrantReadWriteLock类主要特征:
- 公平性:可以选择锁的获取顺序是否要设为公平的(fair).如果创建为公平的,那么等待时间久的线程将优先获取锁。
- 可重入性:ReetrantReadWriteLock类的锁时可重入的。也就是说。Reader角色的线程可以获取”用于写入的锁”,Writer角色可以获取“用于读取的锁”。
- 锁降级:ReeetrantReadWriteLock类可以按照如下顺序将“用于写入的锁”降级为“用于读取的锁”。 “用于写入的锁”->“用于读取的锁”->“释放用于写入的锁”。而”用于读取的锁”不能升级为“用于写入的锁”
Thread-Per-Message
java.util.concurrent.ThreadFactory
//创建一个线程工厂实力类
ThreadFactory threadFactory = new ThreadFactory(){
public Thread new Thread(Runnable r){
return new Thread(r);
}
}
threadFactory.newThread(new Runnable(){
public void run(){
//doSomething
}
})
java.util.concurrent.Executors类获取的ThreadFactory
它有好多静态方法。比如.Executors.defaultThreadFactory()
//通过Executors获取ThreadFactory
ThreadFactory threadFactory = Executors.defalutThreadFactory();
java.util.concurrent.Executor接口
方法 void execute(Runnable r)
Executor接口将某些“处理的执行”抽象化了,参数Runnable对象表示“执行的处理”的内容。
ThreadFactory接口隐藏了线程创建的细节,但并未隐藏创建线程的操作。而Executor接口创建线程的操作也可以隐藏起来。
Executor executor = new Executor(){
public void execute(Runnable r){
new Thread(r).start();
}
};
executor.execute(new Runnable(){
//doSomething
})
java.util.concurrent.ExecutorService接口
继承自Executor
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService几个方法
- execute(Runnable):无返回值。
- submit(Runnabel):返回值Future,future.get()为null
- submit(Callable<T> task):返回值Future<T> future, future.get()有返回值。
ExecutorService exexutorService = Executors.newCachedThreadPool();
try{
executorService.execute(new Runnable(){
public void run(){
//doSomething
}
});
executorService.execute(new Runnable(){
public void run(){
//doSomething
}
});
}finally{
executorService.shutDown();
}
java.util.concurrent.ShceduledExecutorService
它有一个shcedule方法
shcedule(Runnable r, long delay, TimeUnit unit)
ScheduledExecutorService executorService = Executors.newShceduledThreadPool(5);
try{
executorService.shcedule(new Runnable(){
public void run(){
//doSomething
}
}, 3L, TimeUnit.SECONS);
}finally{
executorService.shutDown();
}
WorkerThread模式
WorkerThread模式中的角色
- Client(委托者)
- Channel(通信线路)
- Worker(工人)
- Request(请求)
扩展思路
- 提高吞吐量
- 容量控制
- Worker角色的数量
- Request角色的数量
- 调用与执行的分离
WorkerThread和事件分发线程
点击按钮或者移动鼠标的操作被称为“事件(event)”。比如用ActionEvent类的实例表示一个事件。一系列事件就会存储在事件队列中。
进行下类比。
- 事件对应于Request角色。
- 事件队列对应Channel角色。
- 事件分发线程对应于Worker角色(事件分发线程只有一个)
事件分发线程只有一个,并不能体现出多线程的优点,但是这种设计使我们无需在事件分发线程中要执行的方法中实现工人线程间的互斥处理。
java.util.concurrent包和WorkerThreat模式的关系
java.util.concurrent.ThreadPoolExecutor类
ThreadPoolExecutor可以轻松实现WorkerThead模式。
不过通常使用Executors的静态方法实现比较容易
ExecutorService fixeThreadPool = Executors.newFixedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();
Future模式
Future模式角色
- Client
- Host
- VirtualData(即时返回):在当前线程创建,在新开线程设置数据RealData,在当前线程同步获取,并阻塞等待获取RealData
- RealData(异步返回,通过VirtualData获取)
java.util.concurrent包和Future模式
java.util.concurrent.Callable接口。Callable接口声明了call方法,call方法和Runnable接口的run方法相似,不同的call方法有返回值。Callable<String>表示Callable接口call方法的返回值类型是String类型。
java.util.concurrent.Future接口相当于Future(VirtualData)角色.Future接口声明了获取数据的get方法,没有声明设置值的方法。设置值的方法要在实现Future接口的类中声明。Future<String>表示Future接口get方法的返回值类型是String。除了get方法,Future接口还声明了用于中断运行的cancel方法。
java.util.concurrent.FutureTask类是实现了Future接口的标准类。FutureTask类声明了用于获取值的get方法,用于中断运行的cancel方法,用于设置值的set方法,以及用于设置异常的setException方法。此外,FutureTask类还实现了Runnable接口,所以它还声明了run方法。
FutureTask<RealData> futureTask = new FutureTask(new Callable(){
pulbic RealData call(){
return new RealData();
}
});
new Thread(futureTask).start();
/**
new Thread(Runnable r).start()后会调用Runnable(FutureTask)中run方法,
而FutureTask的run方法又会调用Callable中call方法,
然后会通过FutureTask方法的set方法把call方法返回值进行设置。
而后就可以通过在当前线程通过FutureTask的实例通过get方法获取该值了。
*/
Two-Phase Termination 模式
public class CanShutDownThread extend Thread{
private volatile boolean shutDwonRequested = false;
//终止方法
public void shutDownRequest(){
shutDownRequested = true;
interrupt();
}
//是否终止
public boolean isShutDownRequested(){
return shutDownRequested;
}
public final void run(){
try{
while(!shutDownRequested){
//doSomething
}
}catch(InterruptedExcepton exception){
}finally{
doShutDown();
}
}
private void doShutDown(){
//doSomethingBeforeShutDown
}
}
java.util.concurrent.ExecutorService 和 Two-Phase Termination模式
ExecutorService有isShutDown方法和isTerminated方法
- 线程操作中 : isShutDwon:false,isTerminated:false
- 线程终止ing: isShutDown:true,isTeminated:false
- 终止: isShutDown:true,isTeminated:true
捕获程序整体终止时
- 未捕获的异常的处理器:Thread.setDefaultUncaughtException()
- 退出钩子:Runtime.getRuntime().addShutDownHook()
//Main方法
public static void main(String[] args){
//设置异常处理器
Thread.setDefalutUncaughtExceptionHandler(new Thread.UncaughtExcptionHandler(){
public void uncaughtException(Thread thread,Throwable exception){
//doSomething
}
});
//添加退出钩子
Runtime.getRuntime().addShutDownHook(new Thread(){
public void run(){
//doSomething
}
});
}
优雅地终止线程
- 安全地终止(安全性)
- 必定会进行终止处理(生存性)
- 发出终止请求后尽快进行终止处理(响应性)
中断状态与InterruptedException异常的转换
- 中断状态->InterruptedException
if(Thread.interrupted()){
throw new InterruptedException();
}
不想清楚中断状态时
if(Thead.currentThread().isInterrupted()){
//
}
- InterruptedException->中断状态的转换
try{
Thread.sleep(1000);
}catch(InterruptedException exception){
}
上边代码中,被抛出的InterruptedException异常将被忽略。如果某个线程正在执行sleep时,被其他线程中断了,则“已被中断”这个信息将丢失。
如果想要防止“已被中断”这个信息丢失,线程可以再次中断自己。
try{
Thread.sleep(1000);
}catch(InterruptedException exception){
Thread.currentThread().interrupt();
}
这就相当于从InterruptedException到中断状态的转换。
- InterruptedException异常->InterruptedException异常
InterruptedException excetion = null;
try{
Thread.sleep(1000);
}catch(InterruptedException ex){
exception = ex;
}
...
if(exception != null){
throw exception;
}
java.util.concurrent包和线程同步
java.util.concurrent.CountDownLatch类
CountDownLatch类可有实现“等待指定次数的countDown方法被调用”
public class Main{
private static final int TASKS = 10;//工作个数
public static void main(String[] args){
ExecutorService srv = Executors.newFixedThreadPool(5);
CountDownLatch doneLatch = new CountDownLatch(TASKS);
try{
//开始工作
for(int i=0;i<TASKS;i++){
srv.execute(new MyTask(doneLatch,i));
}
//等待工作结束
doneLatch.await();
}catch(InterruptedException e){
}finally{
srv.shutDown();
}
}
class MyTask implements Runnable{
private final CountDownLatch doneLatch;
private final int context;
public MyTask(CountDownLatch latch, int context){
this.doneLatch = latch;
this.context = context;
}
public void run(){
//doSomethig
...
doneLatch.countDown();
}
}
}
java.util.concurrent.CyclicBarrier类
CountDownLatch只能进行倒数计数。一旦计数值变为0后,即时调用await方法,主线程也会立即返回。
当重复进行线程同步,使用CyclicBarrier。
public class Main{
public static final int THREADS =3;
public static void main(String[] args){
//创建ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
//创建Runnable
Runnnable barrierAction = new Runnable(){
public void run(){
System.out.println("Barrier Action");
}
}
//创建CyclicBarrier用于使线程步调一致
CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
//创建CountDownLatch用于确认工作结束
CountDownLatch doneLatch = new CountDownLatch(THREADS);
try{
//开始工作
for(int t=0;t<THREADS; t++){
executor.execute(new MyTask(phaseBarrier, doneLatch, t));
}
//等待工作就结束
System.out.println("AWAIT");
doneLatch.await();
}catch(InterruptedException e){
}finally{
executor.shutdown();
System.out.println("END");
}
}
class MyTask implements Runnable{
private final CyclicBarrier phaseBarrier;
private final CountDownLatch doneLatch;
private final int context;
private static final PHASES = 5;
public MyTask(CyclicBarrier barrier, CountDownLatch latch, int context){
this.phaseBarrier = barrier;
this.doneLatch = latch;
this.context = context;
}
public void run(){
try{
for(int p = 0; p<PHASES; p++){
//doSomething
phaseBarrier.await();
}
}catch(InterruptedException e){
e.printStackTrace();
}cartch(BrokenBarrierException e){
e.printStackTrace();
}finall{
doneLatch.countDown();
}
}
}
}
新建线程时使用AysncTask或ThreadPoolExecutor或者其他形式自定义线程池的方式;线程池不允许使用Executors去创建,而是使用ThreadPoolExecutor方式去创建。
Exeutors范湖的线程池对象的弊端:
- FixedThreadPool和SingleThreadPool:允许的请求队列的长度是Integer.MAX_VALUE,可能堆积大量的请求,造成OOM
- CachedThreadPool和ScheduledThreadPool:允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,造成OOM。
int NUMBERS_OF_CORES = Runtime.getRuntime().availableProcessors();
int KEEP_ALIVE_TIME = 1;
int KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
BlockedQueue<Runnable> taskQueue = new LinkedBlockedQueue<Runnable>();
ExecutorService service = new ThreadPoolExecutor(NUMBERS_OF_CORES, NUMBERS_OF_CORES*2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT,taskQueue,new BackgroundThreadFactory(), new DefaultRejectedExecutionHandler());
service.execute(new Runnable(){
public void run(){
//doSomething
}
});