高并发编程四

2018-05-15  本文已影响0人  紫雨杰

1、线程池:提供了一个线队列,队列中保存着所有等待状态的线程。避免了频繁的创建与销毁所造成的额外开销,提高了响应的速度。

2、线程池的体系结构:

 java.util.concurrent.Executor:负责线程的使用与调度的根接口【Executor->执行器执行任务的一个接口】
         |---(**)ExecutorService 子接口: 线程池的主要接口
                    |--- ThreadPoolExecutor 线程池的实现类
                    |--- ScheduledExecutorService 子接口:负责线程的调度
                          |--- ScheduledThreadPoolExecutor : 继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口,
                                                             所以该实现类即具备了线程池的功能,又有线程调度的功能。

3、工具类:Executors

 ExecutorService  newFixedThreadPool(): 创建固定大小的线程池;
 ExecutorService  newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量;
 ExecutorService  newSingleThreadExecutor():创建单个线程池。线程池中只有一个线程;
 ScheduledExecutorService  newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务

 newWorkStealingPool:底层是用ForkJoinPool实现的【工作窃取】
 ForkJoinPool

4、工作窃取算法

(1)、工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

(2)、那么为什么需要使用工作窃取算法呢?
        假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,
    并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,
    而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
    而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,
    而窃取任务的线程永远从双端队列的尾部拿任务执行。

(3)、工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,
     其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

5、案例:

★ newWorkStealingPool

public class WorkStealingPoolDemo {
    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool();
        System.out.println(Runtime.getRuntime().availableProcessors());     //输出电脑的CPU核数,创建核数多个线程

        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000)); //daemon
        service.execute(new R(2000));
        
        //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出,所以要添加System.in.read();
        System.in.read(); 
    }

    static class R implements Runnable {
        int time;
        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {     
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }           
            System.out.println(time  + " " + Thread.currentThread().getName());         
        }
    }
}

★ ForkJoinPool 
   (1)、Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
   (2)、ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

   (3)、使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)
                 或execute(ForkJoinTask<T> task)方法来执行指定任务了。

   (4)、其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
        其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

public class ForkJoinPoolDemo {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }       
        System.out.println(Arrays.stream(nums).sum()); //stream api 
    }
    
    
    /*
    static class AddTask extends RecursiveAction {       //继承RecursiveAction,没有返回值  
        int start, end;     
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected void compute() {          
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {        
                int middle = start + (end-start)/2;
                
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }           
        }
    }
    */
    
    static class AddTask extends RecursiveTask<Long> {   //继承RecursiveTask,有返回值,RecursiveTask带有泛型,泛型即为返回值的类型    
        int start, end;     
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {          
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 
            
            int middle = start + (end-start)/2;
            
            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();
            
            return subTask1.join() + subTask2.join();
        }       
    }
    
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);

        long result = task.join();      //join有阻塞,所以不需要写System.in.read();   
        System.out.println(result);
        
        //System.in.read();     
    }
}
上一篇下一篇

猜你喜欢

热点阅读