多线程

图解多线程设计模式--读书笔记

2018-03-06  本文已影响495人  BangAiN

1.接口和类

I:Excutor
I:ExcutorService
AC:AbstractExecutorService
C:ThreadPoolExecutor

I:ThreadFactory
C:DefaultThreadFactory

C:Executors

I:BlockingQueue
I:Future

2.Thread

1.创建线程的方式
继承Thread类
实现Runnable接口
2.Thread.sleep()

3.线程的互斥处理

1.关键字synchronized
2.互斥(mutual exclusion)
竞态条件(race condition)
线程的互斥机制成为监视(monitor)。另外获取锁有时候也称为“拥有(own)监视”或“持有(hold)锁”。
判断某一个线程是否拥有某一对象的锁:Thread.holdsLock(obj)
sychronized方法默认是this作为锁对象,而sychrosized静态方法使用该对象的类对象作为锁。

4.线程协作

等待队列:所有实例都拥有一个等待队列。它是在实例的wait方法执行后停止操作的线程的队列。
入口队列:针对未持有锁的。
当发生以下任意一种情况时,线程便会退出等待队列:

wait方法

wait方法让线程进入等待队列。
obj.wait()
这叫做“线程正在obj实例上wait”
wait()->this.wait(); "线程正在this上wait"
若要执行wait方法,线程必须持有锁(这是规则)。但如果线程进入等待队列,便会释放其实例的锁。
等待队列其实是一个虚拟的概念。它既不是实例中的字段,也不用于获取正在实例上等待的线程的列表的方法。

notify方法

notify方法会将等待队列中的一个线程取出。
obj.notify()
那么obj等待队列中的一个线程会被取出,然后退出等待队列。
同wait方法一样,若要执行notify方法,线程必须也要持有要调用的实例的锁。

noftiyAll方法

notifyAll方法会将等待队列中所有的线程取出来。
obj.notifyAll()
同wait和notify一样,notifyAll也只能有持有调用实例的锁的线程调用。

线程状态迁移

Thread.Stat:NEW、RUNNABLE、TERMINATED、WAITING、TIMED_WAITING和BLOCKED

Single Threaded Execution

临界区:我们只允许单个线程执行的程序范围。
使用Single Threaded Execution情况:

在使用Single Threaded Execution会发生死锁。
在Single Threaded Execution模式下,满足下列条件就会发生死锁:

Single Threaded Execution会降低程序性能:

相关的模式

Guarded Suspension模式
Read-Write Lock模式
Immutable 模式
Thread-Specify Storage模式

原子操作

Jav编程规范定义了一些原子操作。例如,char、int等基本类型的赋值和引用都是原子操作。另外,引用类型的对象的赋值和引用也是原子操作。例外:long和double的赋值和引用不是原子操作。实际上,大部分java虚拟机也将long和double的操作实现了原子。

volatile关键字

通过java.util.concurrent.atomic包提供了便于原子操作的类,如AtomicInteger、AtomicLong、AtomicIntegerArray和AtomicLongArray等,这是通过volatile封装的类库。

计数信号量和Semaphore类

Single Threaded Execution模式用于确保某个区域“只能由一个线程执行”。而扩展下该模式,确保某个区域“最多由N个线程执行”。这个时候就要用计数信号量来控制线程数量。还有假设能给使用的资源税有N个,而需要使用这些资源的线程数大于N。这会导致资源竞争,因此需要交通管制。这种情况也需要计数信号量。

Semaphore semaphore = new Semaphore(2);
semaphore.acquire();//没有可用资源时,阻塞
try{
    doSomethong();
}finally{
    semaphore.release();//释放资源
}

Immutable模式

标准库中的immutable模式:java.lang.String、java.math.BigInteger、java.math.BigDecimal、java.util.regex.Pattern、java.lang.Integer等

Guarded Suspension模式

Queue LinkendList #peek() #remove() #offer()
BlockingQueue LinkedBlockingQueue #take()和#put()互斥

java.util.concurrent包中队列

java.util.concurrent包提供了BlockingQueue接口及其实现类,它们相当于Producer-Consumer模式中的Channel角色。

BlockingQueue接口-阻塞队列

继承Queue接口,拥有offer方法和poll方法等。实际上,实现阻塞功能的方法是BlockingQueue自身的put方法和take方法。

ArrayBlockingQueue-基于数组的BlockingQueue

表示元素个数有最大限制的BlockingQueue。

LinkedBlockingQueue-基于链表的BlockingQueue

表示元素没有最大限制(内存为满情况下)

PriorityBlockingQueue-带有优先级的BlockingQueue

数据的优先级依据Comparable接口的自然排序,或者构造函数出入的Comparator接口的顺序指定。

DelayQueue-一定时间后才可以take的BlockingQueue

DelayQueue用于存储Delayed对象的队列。当从该队列take时,只有各个元素指定的时间到期后才可以take。另外,到期时间最长的元素将被take。

SynchronousQueue-直接传递的BlockingQueue

ConcurrentLinkedQueue-元素个数没有最大限制的线程安全队列

使用java.util.concurrent.Exchanger类交换缓冲区

Exchanger类用于两个线程安全地交换对象。

Balking模式

Producer-Consumer模式

一般来说,该模式会有多个生产者和多个消费者。当只有一个生产者和一个消费者时又称为Pipe模式。
生产者消费者模式在生产者和消费者之间加入了一个“桥梁角色”,用以消除线程间处理速度的差异。

生产者消费者模式中的角色
Data角色:由生产者负责生产,由消费者消耗。
Producer角色:生产者。生产者生产Data角色,并将其传递给Channel角色。
Consumer角色:消费者。从Channel角色获取Data角色并使用。
Channel角色:通道。Channel保存从Producer角色获取的Data角色,还会响应Consumer角色的请求,传递Data角色。为了安全起见,Channle角色会对Producer角色和Channel角色的访问执行互斥处理。
当Producer传递Data角色给Channel角色时,如果Channle角色的状态不适合接收Data角色,那么Producer角色会一直等待,直到Channel角色的状态适合接收Data角色。
当Consumer角色从Channel角色获取Data角色时,如果Channel角色没有可以提供的Data角色时,Consumer角色会一直等待,直到Channel角色状态可以提供Data角色。

加入了throws InterruptedException的方法

标准库中三个典型方法

三个方法和java.lang.Thread的interrupt方法使用。

notify、nofiyAll和interrupt方法的区别

notify/nofifyAll是java.lang.Object的方法,唤醒的是 该实例等待队列中的线程 ,而不是直接指定的线程。notify/notifyAll唤醒的线程会继续执行wait的下一条语句。另外,执行notify/notifyAll时,线程必须获取实例的锁
interrupt方法是java.lang.Thread的实例方法,可以直接指定线程并唤醒。当被interrupt的线程处于sleep或wait时,会抛出InterruptedException异常。执行interrupt时,不要获取要取消线程的锁。

处于sleep、wait或join时,interrupt后异常抛出的时机:

wait和interrupt:当正在wait的线程被interrupt时(即线程被取消执行时),该线程在重新获取锁之后,抛出InterruptedException。在获取锁之前,不会抛出InterruptedException。
sleep、join和interrupt:当处于sleep的线程或被join的线程被interrupt时,会立马抛出InterruptedException异常。这个和wait状态下interrupt下是有区别的。

调用interrupt一定会抛出InterruptException异常?

不是滴。interrupt方法只是改变了线程的中断状态而已。将线程从非中断状态变为中断状态。
sleep、wait或join方法内部会有线程中断状态的检查。
而只有在执行到sleep、wait或join方法,或者有执行到编写的线程中断状态检查抛出InterruptionException时才会抛出该异常。

java.lang.Thread的实例方法interrupt和类方法interrupted

interrupt方法是将线程从非中断状态切换到中断状态。
Thread.interrupted方法是检查并清除中断状态。若当前线程处于中断状态,则返回true;若处于非中断状态,则返回false。然后将当前线程从中断切换到非中断。

不去使用stop方法

过时方法。stop方法可能会破坏安全性。因为,即使线程正在运行临界区的操作,Thread类也会立即终止该线程的操作。
interrupt方法为什么可以?
interrupt只是改变线程的中断状态。而只有线程执行sleep、wait和join方法时才会抛出InterruptedException异常。

Read-Write Lock模式

多个同学抄黑板,老师等同学抄完再擦黑板。

锁的含义

物理锁:synchrosized关键字定义的锁等
逻辑锁:ReadWriteLock实现的逻辑锁(Before/After模式:防止忘记释放锁)

java.util.concurrent.locks包

物理锁:Lock接口以及三个实现类:ReentrantLock、ReentrantReadWriteLock.ReadLock和ReentrantReadWriteLock.WriteLock。既重入锁、读锁和写锁。

/**
锁必须显示的创建、锁定和释放。
*/
Lock lock = new ReentrantLock();
lock.lock();
try{
    
}finally{
    lock.unlock();
}

ReentrantLock比synchronized的三个特性:

两种锁的底层策略:

可中断锁

如果某一线程 A 正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程 B 不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,如果此时 ReetrantLock 提供的是忽略中断锁,则它不会去理会该中断,而是让线程B继续等待,而如果此时 ReetrantLock 提供的是响应中断锁,那么它便会处理中断,让线程 B 放弃等待,转而去处理其他事情。

ReentrantLock lock = new ReentrantLock();  
...........  
lock.lockInterruptibly();//获取响应中断锁  
try {  
      //更新对象的状态  
      //捕获异常,必要时恢复到原来的不变约束  
      //如果有return语句,放在这里  
}finally{  
    lock.unlock();        //锁必须在finally块中释放  
} 
public class Buffer {  

    private Object lock;  

    public Buffer() {  
        lock = this;  
    }  

    public void write() {  
        synchronized (lock) {  
            long startTime = System.currentTimeMillis();  
            System.out.println("开始往这个buff写入数据…");  
            for (;;)// 模拟要处理很长时间      
            {  
                if (System.currentTimeMillis()  
                        - startTime > Integer.MAX_VALUE) {  
                    break;  
                }  
            }  
            System.out.println("终于写完了");  
        }  
    }  

    public void read() {  
        synchronized (lock) {  
            System.out.println("从这个buff读数据");  
        }  
    }  

    public static void main(String[] args) {  
        Buffer buff = new Buffer();  

        final Writer writer = new Writer(buff);  
        final Reader reader = new Reader(buff);  

        writer.start();  
        reader.start();  

        new Thread(new Runnable() {  

            @Override  
            public void run() {  
                long start = System.currentTimeMillis();  
                for (;;) {  
                    //等5秒钟去中断读      
                    if (System.currentTimeMillis()  
                            - start > 5000) {  
                        System.out.println("不等了,尝试中断");  
                        reader.interrupt();  //尝试中断读线程  
                        break;  
                    }  

                }  

            }  
        }).start();  
        // 我们期待“读”这个线程能退出等待锁,可是事与愿违,一旦读这个线程发现自己得不到锁,  
        // 就一直开始等待了,就算它等死,也得不到锁,因为写线程要21亿秒才能完成 T_T ,即使我们中断它,  
        // 它都不来响应下,看来真的要等死了。这个时候,ReentrantLock给了一种机制让我们来响应中断,  
        // 让“读”能伸能屈,勇敢放弃对这个锁的等待。我们来改写Buffer这个类,就叫BufferInterruptibly吧,可中断缓存。  
    }  
}  

class Writer extends Thread {  

    private Buffer buff;  

    public Writer(Buffer buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  
        buff.write();  
    }  
}  

class Reader extends Thread {  

    private Buffer buff;  

    public Reader(Buffer buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  

        buff.read();//这里估计会一直阻塞      

        System.out.println("读结束");  

    }  
}  

import java.util.concurrent.locks.ReentrantLock;  

public class BufferInterruptibly {  

    private ReentrantLock lock = new ReentrantLock();  

    public void write() {  
        lock.lock();  
        try {  
            long startTime = System.currentTimeMillis();  
            System.out.println("开始往这个buff写入数据…");  
            for (;;)// 模拟要处理很长时间      
            {  
                if (System.currentTimeMillis()  
                        - startTime > Integer.MAX_VALUE) {  
                    break;  
                }  
            }  
            System.out.println("终于写完了");  
        } finally {  
            lock.unlock();  
        }  
    }  

    public void read() throws InterruptedException {  
        lock.lockInterruptibly();// 注意这里,可以响应中断      
        try {  
            System.out.println("从这个buff读数据");  
        } finally {  
            lock.unlock();  
        }  
    }  

    public static void main(String args[]) {  
        BufferInterruptibly buff = new BufferInterruptibly();  

        final Writer2 writer = new Writer2(buff);  
        final Reader2 reader = new Reader2(buff);  

        writer.start();  
        reader.start();  

        new Thread(new Runnable() {  

            @Override  
            public void run() {  
                long start = System.currentTimeMillis();  
                for (;;) {  
                    if (System.currentTimeMillis()  
                            - start > 5000) {  
                        System.out.println("不等了,尝试中断");  
                        reader.interrupt();  //此处中断读操作  
                        break;  
                    }  
                }  
            }  
        }).start();  

    }  
}  

class Reader2 extends Thread {  

    private BufferInterruptibly buff;  

    public Reader2(BufferInterruptibly buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  

        try {  
            buff.read();//可以收到中断的异常,从而有效退出      
        } catch (InterruptedException e) {  
            System.out.println("我不读了");  
        }  

        System.out.println("读结束");  

    }  
}  

class Writer2 extends Thread {  

    private BufferInterruptibly buff;  

    public Writer2(BufferInterruptibly buff) {  
        this.buff = buff;  
    }  

    @Override  
    public void run() {  
        buff.write();  
    }  

}  

条件变量实现线程间协作

在生产者——消费者模型一文中,我们用 synchronized 实现互斥,并配合使用 Object 对象的 wait()和 notify()或 notifyAll()方法来实现线程间协作。Java 5 之后,我们可以用 Reentrantlock 锁配合 Condition 对象上的 await()和 signal()或 signalAll()方法来实现线程间协作。在 ReentrantLock 对象上 newCondition()可以得到一个 Condition 对象,可以通过在 Condition 上调用 await()方法来挂起一个任务(线程),通过在 Condition 上调用 signal()来通知任务,从而唤醒一个任务,或者调用 signalAll()来唤醒所有在这个 Condition 上被其自身挂起的任务。另外,如果使用了公平锁,signalAll()的与 Condition 关联的所有任务将以 FIFO 队列的形式获取锁,如果没有使用公平锁,则获取锁的任务是随机的,这样我们便可以更好地控制处在 await 状态的任务获取锁的顺序。与 notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定唤醒与自身 Condition 对象绑定在一起的任务。

import java.util.concurrent.*;  
import java.util.concurrent.locks.*;  

class Info{ // 定义信息类  
    private String name = "name";//定义name属性,为了与下面set的name属性区别开  
    private String content = "content" ;// 定义content属性,为了与下面set的content属性区别开  
    private boolean flag = true ;   // 设置标志位,初始时先生产  
    private Lock lock = new ReentrantLock();    
    private Condition condition = lock.newCondition(); //产生一个Condition对象  
    public  void set(String name,String content){  
        lock.lock();  
        try{  
            while(!flag){  
                condition.await() ;  
            }  
            this.setName(name) ;    // 设置名称  
            Thread.sleep(300) ;  
            this.setContent(content) ;  // 设置内容  
            flag  = false ; // 改变标志位,表示可以取走  
            condition.signal();  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }finally{  
            lock.unlock();  
        }  
    }  

    public void get(){  
        lock.lock();  
        try{  
            while(flag){  
                condition.await() ;  
            }     
            Thread.sleep(300) ;  
            System.out.println(this.getName() +   
                " --> " + this.getContent()) ;  
            flag  = true ;  // 改变标志位,表示可以生产  
            condition.signal();  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }finally{  
            lock.unlock();  
        }  
    }  

    public void setName(String name){  
        this.name = name ;  
    }  
    public void setContent(String content){  
        this.content = content ;  
    }  
    public String getName(){  
        return this.name ;  
    }  
    public String getContent(){  
        return this.content ;  
    }  
}  
class Producer implements Runnable{ // 通过Runnable实现多线程  
    private Info info = null ;      // 保存Info引用  
    public Producer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        boolean flag = true ;   // 定义标记位  
        for(int i=0;i<10;i++){  
            if(flag){  
                this.info.set("姓名--1","内容--1") ;    // 设置名称  
                flag = false ;  
            }else{  
                this.info.set("姓名--2","内容--2") ;    // 设置名称  
                flag = true ;  
            }  
        }  
    }  
}  
class Consumer implements Runnable{  
    private Info info = null ;  
    public Consumer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        for(int i=0;i<10;i++){  
            this.info.get() ;  
        }  
    }  
}  
public class ThreadCaseDemo{  
    public static void main(String args[]){  
        Info info = new Info(); // 实例化Info对象  
        Producer pro = new Producer(info) ; // 生产者  
        Consumer con = new Consumer(info) ; // 消费者  
        new Thread(pro).start() ;  
        //启动了生产者线程后,再启动消费者线程  
        try{  
            Thread.sleep(500) ;  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }  

        new Thread(con).start() ;  
    }  
}  

JAVA1.5 locks包中提供了实现Read-Write Lock模式的ReadWriteLock接口和ReentrantReadWriteLock类。

ReadWriteLock rwl = new ReentrantReadWriteLock();      
rwl.writeLock().lock()  //获取写锁  
rwl.readLock().lock()  //获取读锁  
ReetrantReadWriteLock类主要特征:

Thread-Per-Message

java.util.concurrent.ThreadFactory

//创建一个线程工厂实力类
ThreadFactory threadFactory = new ThreadFactory(){
    public Thread new Thread(Runnable r){
        return new Thread(r);
    }
}

threadFactory.newThread(new Runnable(){
    public void run(){
        //doSomething
    }
})

java.util.concurrent.Executors类获取的ThreadFactory

它有好多静态方法。比如.Executors.defaultThreadFactory()

//通过Executors获取ThreadFactory
ThreadFactory threadFactory = Executors.defalutThreadFactory();

java.util.concurrent.Executor接口

方法 void execute(Runnable r)
Executor接口将某些“处理的执行”抽象化了,参数Runnable对象表示“执行的处理”的内容。
ThreadFactory接口隐藏了线程创建的细节,但并未隐藏创建线程的操作。而Executor接口创建线程的操作也可以隐藏起来。

Executor executor = new Executor(){
    public void execute(Runnable r){
        new Thread(r).start();
    }
};
executor.execute(new Runnable(){
    //doSomething
})

java.util.concurrent.ExecutorService接口

继承自Executor
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService几个方法

ExecutorService exexutorService = Executors.newCachedThreadPool();
try{
    executorService.execute(new Runnable(){
        public void run(){
            //doSomething
        }
    });
    
    executorService.execute(new Runnable(){
        public void run(){
            //doSomething
        }
    });
}finally{
    executorService.shutDown();
}

java.util.concurrent.ShceduledExecutorService

它有一个shcedule方法
shcedule(Runnable r, long delay, TimeUnit unit)

ScheduledExecutorService executorService = Executors.newShceduledThreadPool(5);
try{
    executorService.shcedule(new Runnable(){
        public void run(){
            //doSomething
        }
    }, 3L, TimeUnit.SECONS);
}finally{
    executorService.shutDown();
}

WorkerThread模式

WorkerThread模式中的角色

扩展思路

WorkerThread和事件分发线程

点击按钮或者移动鼠标的操作被称为“事件(event)”。比如用ActionEvent类的实例表示一个事件。一系列事件就会存储在事件队列中。
进行下类比。

事件分发线程只有一个,并不能体现出多线程的优点,但是这种设计使我们无需在事件分发线程中要执行的方法中实现工人线程间的互斥处理。

java.util.concurrent包和WorkerThreat模式的关系

java.util.concurrent.ThreadPoolExecutor类

ThreadPoolExecutor可以轻松实现WorkerThead模式。
不过通常使用Executors的静态方法实现比较容易

ExecutorService fixeThreadPool = Executors.newFixedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();

Future模式

Future模式角色

java.util.concurrent包和Future模式

java.util.concurrent.Callable接口。Callable接口声明了call方法,call方法和Runnable接口的run方法相似,不同的call方法有返回值。Callable<String>表示Callable接口call方法的返回值类型是String类型。

java.util.concurrent.Future接口相当于Future(VirtualData)角色.Future接口声明了获取数据的get方法,没有声明设置值的方法。设置值的方法要在实现Future接口的类中声明。Future<String>表示Future接口get方法的返回值类型是String。除了get方法,Future接口还声明了用于中断运行的cancel方法。
java.util.concurrent.FutureTask类是实现了Future接口的标准类。FutureTask类声明了用于获取值的get方法,用于中断运行的cancel方法,用于设置值的set方法,以及用于设置异常的setException方法。此外,FutureTask类还实现了Runnable接口,所以它还声明了run方法。

FutureTask<RealData> futureTask = new FutureTask(new Callable(){
    pulbic RealData call(){
        return new RealData();
    }
});

new Thread(futureTask).start();
/**
new Thread(Runnable r).start()后会调用Runnable(FutureTask)中run方法,      
而FutureTask的run方法又会调用Callable中call方法,  
然后会通过FutureTask方法的set方法把call方法返回值进行设置。  
而后就可以通过在当前线程通过FutureTask的实例通过get方法获取该值了。
*/

Two-Phase Termination 模式

public class CanShutDownThread extend Thread{
    private volatile boolean shutDwonRequested = false;
    
    //终止方法
    public void shutDownRequest(){
        shutDownRequested = true;
        interrupt();
    }
    
    //是否终止
    public boolean isShutDownRequested(){
        return shutDownRequested;
    }
    
    public final void run(){
        try{
            while(!shutDownRequested){
                //doSomething
            }
        }catch(InterruptedExcepton exception){
            
        }finally{
            doShutDown();
        }
    }
    
    private void doShutDown(){
        //doSomethingBeforeShutDown
    }
}



java.util.concurrent.ExecutorService 和 Two-Phase Termination模式

ExecutorService有isShutDown方法和isTerminated方法

捕获程序整体终止时

//Main方法
public static void main(String[] args){
    //设置异常处理器
    Thread.setDefalutUncaughtExceptionHandler(new Thread.UncaughtExcptionHandler(){
        public void uncaughtException(Thread thread,Throwable exception){
            //doSomething    
        }
    });
    
    //添加退出钩子
    Runtime.getRuntime().addShutDownHook(new Thread(){
       public void run(){
           //doSomething
       } 
    });
}

优雅地终止线程

中断状态与InterruptedException异常的转换

if(Thread.interrupted()){
    throw new InterruptedException();
}

不想清楚中断状态时

if(Thead.currentThread().isInterrupted()){
    //
}
try{
    Thread.sleep(1000);
}catch(InterruptedException exception){
    
}

上边代码中,被抛出的InterruptedException异常将被忽略。如果某个线程正在执行sleep时,被其他线程中断了,则“已被中断”这个信息将丢失。
如果想要防止“已被中断”这个信息丢失,线程可以再次中断自己。

try{
    Thread.sleep(1000);
}catch(InterruptedException exception){
    Thread.currentThread().interrupt();
}

这就相当于从InterruptedException到中断状态的转换。

InterruptedException excetion = null;
 try{
     Thread.sleep(1000);
 }catch(InterruptedException ex){
     exception  = ex;
 }
 
 ...
 
 if(exception != null){
     throw exception;
 }

java.util.concurrent包和线程同步

java.util.concurrent.CountDownLatch类

CountDownLatch类可有实现“等待指定次数的countDown方法被调用”

public class Main{  
    private static final int TASKS = 10;//工作个数
    public static void main(String[] args){
        ExecutorService srv = Executors.newFixedThreadPool(5);
        
        CountDownLatch doneLatch = new CountDownLatch(TASKS);
        
        try{
            //开始工作
            for(int i=0;i<TASKS;i++){
                srv.execute(new MyTask(doneLatch,i));
            }
            
            //等待工作结束
            doneLatch.await();
        }catch(InterruptedException e){
            
        }finally{
            srv.shutDown();
        }
    }
    
    class MyTask implements Runnable{
        private final CountDownLatch doneLatch;
        private final int context;
        
        public MyTask(CountDownLatch latch, int context){
            this.doneLatch = latch;
            this.context = context;
        }
        
        public void run(){
            //doSomethig
            ...
            doneLatch.countDown();
        }
    }
}

java.util.concurrent.CyclicBarrier类

CountDownLatch只能进行倒数计数。一旦计数值变为0后,即时调用await方法,主线程也会立即返回。
当重复进行线程同步,使用CyclicBarrier。

public class Main{
    public static final int THREADS  =3;
    public static void main(String[] args){
        //创建ExecutorService
        ExecutorService executor = Executors.newFixedThreadPool(THREADS);
        
        //创建Runnable
        Runnnable barrierAction = new Runnable(){
            public void run(){
                System.out.println("Barrier Action");
            }
        }
        
        //创建CyclicBarrier用于使线程步调一致
        CyclicBarrier phaseBarrier = new CyclicBarrier(THREADS, barrierAction);
        
        //创建CountDownLatch用于确认工作结束
        CountDownLatch doneLatch = new CountDownLatch(THREADS);
        
        try{
            //开始工作
            for(int t=0;t<THREADS; t++){
                executor.execute(new MyTask(phaseBarrier, doneLatch, t));
                
            }
            
            //等待工作就结束
            System.out.println("AWAIT");
            doneLatch.await();
        }catch(InterruptedException e){
            
        }finally{
            executor.shutdown();
            System.out.println("END");
        }
    }
    
    
    class MyTask implements Runnable{
        private final CyclicBarrier phaseBarrier;
        private final CountDownLatch doneLatch;
        private final int context;
        private static final PHASES = 5;
        
        public MyTask(CyclicBarrier barrier, CountDownLatch latch, int context){
            this.phaseBarrier = barrier;
            this.doneLatch = latch;
            this.context = context;
        }
        
        public void run(){
            try{
                for(int p = 0; p<PHASES; p++){
                    //doSomething
                    phaseBarrier.await();
                }
            }catch(InterruptedException e){
                e.printStackTrace();
            }cartch(BrokenBarrierException e){
                e.printStackTrace();
            }finall{
                doneLatch.countDown();
            }
        }
    }
}

新建线程时使用AysncTask或ThreadPoolExecutor或者其他形式自定义线程池的方式;线程池不允许使用Executors去创建,而是使用ThreadPoolExecutor方式去创建。

Exeutors范湖的线程池对象的弊端:

int NUMBERS_OF_CORES = Runtime.getRuntime().availableProcessors();
int KEEP_ALIVE_TIME = 1;
int KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
BlockedQueue<Runnable> taskQueue = new LinkedBlockedQueue<Runnable>();
ExecutorService service = new ThreadPoolExecutor(NUMBERS_OF_CORES, NUMBERS_OF_CORES*2, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT,taskQueue,new BackgroundThreadFactory(), new DefaultRejectedExecutionHandler());

service.execute(new Runnable(){
    public void run(){
        //doSomething
    }
});

上一篇下一篇

猜你喜欢

热点阅读