Java常用并发包

2019-04-21  本文已影响0人  千淘萬漉

一、Fork/Join


Java7提供了Fork/Join用于并行执行任务的框架, 可以把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决,对开发来说也不再需要处理各种并行相关事务,例如同步、通信、死锁等问题,需要做的就是拆分任务并组合每个子任务的中间结果。

1.工作窃取

JDK1.7引入的Fork/Join框架就是基于工作窃取算法,是指某个线程从其他队列里窃取任务来执行。工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

一般会使用双端队列,比如AB线程分别处理AB两个任务队列,当有一个线程执行完一个任务队列时,会去窃取另一个未完成的队列任务,而被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

source: Fork/Join框架介绍

2.类关系和API

在RecursiveTask类中最重要的是实现compute()接口,此处定义了计算和拆分方法。
fork() - 每个子任务在调用fork方法时,又会进入compute方法,如果还要拆分则继续递归调用。
join() - 使用join方法会等待子任务执行完并得到其结果。
invokeAll() - 可变参数,触发所有输入的Task来计算。

假设在计算一个求和算法时,就可以采用Fork/Join方法进行并行计算。

public class Calculator extends RecursiveTask<Integer> {  
 
    private static final int THRESHOLD = 100;  
    private int start;  
    private int end;  
  
    public Calculator(int start, int end) {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() {  
        int sum = 0;  
        if((start - end) < THRESHOLD){  
            for(int i = start; i< end;i++){  
                sum += i;  
            }  
        }else{  
            int middle = (start + end) /2;  
            Calculator left = new Calculator(start, middle);  
            Calculator right = new Calculator(middle + 1, end);  
            left.fork();  
            right.fork();  
            sum = left.join() + right.join();  
        }  
        return sum;  
    }  

}  

source:Java 7 Fork/Join 并行计算框架概览

二、CountDownLatch ( 重点! )


CountDownLatch,又名发令枪。这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

1.API及用法

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量,每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

//参数count为计数值
public CountDownLatch(int count) {  };  
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };   
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException{};  
//将count值减1
public void countDown() { };  

2.使用场景

三、CyclicBarrier


CyclicBarrier——回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

//参数parties指让多少个线程或者任务等待至barrier状态
public CyclicBarrier(int parties) {}
//参数barrierAction为当这些线程都达到barrier状态时会执行的内容
public CyclicBarrier(int parties, Runnable barrierAction) {}

CyclicBarrier中最重要的方法就是await方法,有重载方法但是一般采用:

//用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务
public int await() throws InterruptedException, BrokenBarrierException { };

处理同一个动作有多个线程同时执行的场景,用一个栅栏卡在同一个状态,只有当所有线程抵达该状态了,才可以全部放行。

public class TestCyclicBarrier {
    public static void main(String[] args) {
        //1.得到一个CyclicBarrier实例
        CyclicBarrier cb = new CyclicBarrier(4);
        new Thread(new Fishing(cb),"1").start();
        new Thread(new Fishing(cb),"2").start();
        new Thread(new Fishing(cb),"3").start();
        new Thread(new Fishing(cb),"4").start();
    }
    
    static class Fishing implements Runnable{
        CyclicBarrier cb;
        public Fishing(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            try {
                cb.await();
                System.out.println("第(" + Thread.currentThread().getName() 
                                        + ")个人开始钓鱼");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

CountDownLatch和CyclicBarrier的区别:

四、Semaphore(推荐)


Semaphore——信号量,控制同时访问某个特定资源的线程数量,用在流量控制。

Semaphore可以控同时访问的线程个数,一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少。其包含两个构造方法:

 //参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {         
    sync = new NonfairSync(permits);
}
 //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {   
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

//获取一个许可
public void acquire() throws InterruptedException {  }     
//获取permits个许可
public void acquire(int permits) throws InterruptedException { }    
//释放一个许可
public void release() { }          
//释放permits个许可
public void release(int permits) { }    

source: Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

五、Callable、Future和FutureTask

java.lang.Runnable接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。如果想要返回结果,则可以采用Callable。

public interface Callable<V> {
    V call() throws Exception;
}
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
public FutureTask(Callable<V> callable) { }
public FutureTask(Runnable runnable, V result) { }

实际使用的时候,可先定义Callble类、并重写call方法:

public class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

接着可以有两种方式去启动有返回值的线程方法:

//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
//创建Callable对象任务  
Task task = new Task();
//提交任务并获取执行结果  
Future<Integer> result = executor.submit(task);
//关闭线程池,拿完线程就可以把池子关闭了,及时释放资源
executor.shutdown();
//构造一个有返回值的并行任务        
Task task = new Task();
//构造FutureTask        
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
//构造一个新线程并注入FutureTask       
Thread thread = new Thread(futureTask);
//开启线程
thread.start();
try {
     if(futureTask.get()!=null){  
        System.out.println("task运行结果"+futureTask.get());
     }else{
        System.out.println("future.get()未获取到结果"); 
     }
} catch (ExecutionException e) {
     e.printStackTrace();
}

source: Java并发编程:Callable、Future和FutureTask

上一篇下一篇

猜你喜欢

热点阅读