ThreadPool线程池浅析(代码讲解)

2020-03-07  本文已影响0人  Minority
转自:http://www.cnblogs.com/ssslinppp/

Case1(Executor):

public class T01_MyExecutor implements Executor {
    public static void main(String[] args) {
        new T01_MyExecutor().execute(()->{
            System.out.println("hello executor");
        });
    }

    @Override
    //Executor就是定义一个接口,重写execute方法来执行一项任务
    public void execute(Runnable command){
        new Thread(command).run();
        //command.run();
    }
}
/*====================output========================
hello executor
 */

Executor是一个接口,重写execute方法来执行一项任务。

Case2(ExecutorService):

ExecutorService 继承自Executor,除了继承自父类的execute方法之外,还重写了submit方法。ExecutorService 相当于后台的服务,等着你往里面扔任务,除了execute方法之外还有submit。

submit里面除了可以往里面扔Runnable之外还可以往里面扔callable,当你需要一个线程运行完有返回值的时候,用callable,不需要则用没有返回值的Runnable。

Case3(Callable):

Callable是对Runnable进行了扩展,Runnable里面有一个run()方法,Callable里面有一个call()方法,其和run()类似。是被其他线程进行执行的方法。主要的区别是:一个有返回值一个没有。当你需要一个线程运行完有返回值的时候,用callable,不需要则用Runnable。

Case4(Executors):

Executors是一个实现类,而不是接口。区别于Executor,Executor是一个接口,Executors是一个工具类,操作Executor的一个工具类。

Case5(线程池的概念 && newFixedThreadPool):

线程池的概念:维护一堆线程,装在某个容器里等着任务来运行,线程池里面的线程执行完任务之后并不会消失(空闲),新的任务来啦,不需要启动新的线程。如果线程池里面的线程都忙不过来,则把“任务”扔到一个队列里面,等着线程池里面的线程来拿。

线程池维护的队列:

  • 未执行的任务队列
  • 完成的任务队列

线程池创建的常用三种方式:

    1. newFixedThreadPool:使用LinkedBlockingQueue实现,定长线程池。
    1. newSingleThreadExecutor:使用LinkedBlockingQueue实现,一池只有一个线程。
    1. newCachedThreadPool:使用SynchronousQueue实现,变长线程池。
    1. 附加几种:
      • newScheduledThreadPool:执行定时的任务,里面的线程是可以复用
      • newWorkStealingPool:工作窃取线程池
      • ForkJoinPool:任务可以自动进行切分

查看线程池维护的队列Demo:

package ThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T05_ThreadPool {
    public static void main(String[] args) throws InterruptedException {

        //ExecutorService是接口;Executors是工具类;(子类指向父接口)。newFixedThreadPool是用来创建固定线程个数的线程池,里面线程个数为5
        ExecutorService service= Executors.newFixedThreadPool(5);   //向ExecutorService里面扔任务的方法:execute submit
        for (int i = 0; i < 6; i++) {

            //线程池的五个线程“抢着”执行六个方法
            service.execute(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }

        System.out.println(service);

        //线程干完活就关掉线程池服务。shutdownNow()不管你干没干完,直接关掉
        service.shutdown();
        System.out.println(service.isTerminated());  //isTerminated:是否都执行完啦
        System.out.println(service.isShutdown());  //是否关闭,关闭不代表执行完啦,有可能正在执行之中等着关闭
        System.out.println(service);


        //主线程睡5秒钟
        TimeUnit.SECONDS.sleep(5);

        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);


    }
}
/*
* 运行结果:
* java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-5
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-5
true
true
java.util.concurrent.ThreadPoolExecutor@7cca494b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
* */

由上面的程序可以看出,线程池维护了两个队列:queued tasks和completed tasks。newFixedThreadPool是用来创建固定线程个数的线程池,上面代码中里面线程个数为5。

Case6(Future && FutureTask):

FutureTask是Future 的一个实现类,其是可取消的异步计算。具有启动和取消计算的方法,查询计算是否完整,并检索计算结果。 结果只能在计算完成后才能检索; 如果计算尚未完成,则get()方法将阻止。 一旦计算完成,则无法重新启动或取消计算(会使用之前的计算结果。除非使用runAndReset()调用计算)

FutureTask,未来的任务,有一个返回值,比如FutureTask<Integer>,任务的返回值是Integer类型,FutureTask里面可以装一个Callalbe类型(有返回值,Integer call(); ),把Callable包装成一个FutureTask,FutureTask继承自RunnableFuture接口。所以FutureTask相当于一个适配器,使得可以使用Callable来创建线程(Thread类没有提供传入Callable的构造函数,Thread的构造方法里面不能接受Callable接口)。

package ThreadPool;


import java.util.concurrent.*;


public class T06_Future {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        FutureTask<Integer> task=new FutureTask<>(()->{  //把Callable包装成一个FutureTask
            TimeUnit.MILLISECONDS.sleep(500);
            return 1000;
        });

        new Thread(task).start();

        System.out.println(task.get());//阻塞

        ExecutorService service= Executors.newFixedThreadPool(5);

        //Future拿到未来的一个返回值
        Future<Integer> f=service.submit(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1;
        });

        System.out.println(service);

        System.out.println(f.isDone());
        System.out.println(f.get()+task.get());  //get是阻塞式的,建议把futureTask.get()放最后,因为其要求获得Callable线程计算的结果,如果计算没有完成就会强求,会导致阻塞,直到计算完成
        System.out.println(f.isDone());

    }
}
/*====================output========================
1000
java.util.concurrent.ThreadPoolExecutor@214c265e[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
false
1001
true
*/

get是阻塞式的,建议把futureTask.get()放最后,因为其要求获得Callable线程计算的结果,如果计算没有完成就会强求,会导致阻塞,直到计算完成。

Case7(newCachedThreadPool):

package ThreadPool;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class T08_CachedPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service= Executors.newCachedThreadPool();
        System.out.println(service);

        for (int i = 0; i < 2; i++) {
            service.execute(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }

        System.out.println(service);


        TimeUnit.SECONDS.sleep(80);

        System.out.println(service);
    }
}


/*
* 等待了80s之后,会发现线程池里面的线程没啦,tasks变为了2.
*java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
 java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
 pool-1-thread-2
 pool-1-thread-1
 java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
*
* */

newCachedThreadPool,刚开始一个线程都没有,来一个任务就起一个线程,如果来一个任务时线程池里面有空闲的线程,则立即执行,如果来一个任务线程池里面没有空闲的线程,则再起一个线程,线程的最大数量直到你的内存崩了之后(闲置的线程超过60s[alive time]后就自动销毁了)。

Case8(newSingleThreadExecutor):

package ThreadPool;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class T09_SingleThreadPool {
    public static void main(String[] args) {

        ExecutorService service= Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j=i;
            service.execute(()->{
                System.out.println(j+" "+Thread.currentThread().getName());
            });
        }
    }
}

线程池里面就有一个线程,即使任务多也就是能一个。他的作用是保证任务前后是顺序执行的。既然只有一个线程,为什么不直接使用new Thread呢?是因为他没有每次都创建线程的开销。

Case9(newScheduledThreadPool):

package ThreadPool;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class T10_SchedulePool {
    public static void main(String[] args) {


        ScheduledExecutorService service= Executors.newScheduledThreadPool(4);

        //以固定的频率来执行某个任务:参数列表:Runnable command,
        //                                   long initialDelay,
        //                                   long period,
        //                                   TimeUnit unit
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        },0,500,TimeUnit.MILLISECONDS);  //第一个任务马上起来执行,每隔500ms任务重复执行
    }
}
/*====================output========================
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
......
*/

用来执行定时的任务,里面的线程是可以复用的。上面的代码中,打印任务马上起来执行,并且会每隔500ms任务重复执行。并且每次执行任务的线程也不唯一,线程也是可以复用的。

Case10(newWorkStealingPool):

package ThreadPool;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static javafx.scene.input.KeyCode.R;


public class T11_WorkStealingPool {
    public static void main(String[] args) throws IOException {

        ExecutorService service= Executors.newWorkStealingPool();


        //看自己的机器的核数
        System.out.println(Runtime.getRuntime().availableProcessors());

        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));

        //由于产生的精灵线程(守护线程、后台线程,daemon),主线程不阻塞的话,看不到输出
        System.in.read();
    }

    static class R implements Runnable{

        int time;

        R(int t){
            this.time=t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time+" "+Thread.currentThread().getName());
        }
    }
}
/*====================output========================
8
1000 ForkJoinPool-1-worker-1
2000 ForkJoinPool-1-worker-4
2000 ForkJoinPool-1-worker-3
2000 ForkJoinPool-1-worker-2
2000 ForkJoinPool-1-worker-5
*/

工作窃取线程池,每个线程都维护一个自已的任务队列,若某个线程自己的任务队列空了之后,回去其他的线程的任务队列中去偷任务来执行(主动找活干,以前是得分配)。线程池中的每个线程都是守护线程(daemon)。

Case11(ForkJoinPool):

package ThreadPool;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums=new int[1000000];
    static final int MAX_NUM=50000;  //每一个小任务不超过50000个
    static Random r=new Random();


    //对1000000个数算总和
    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i]=r.nextInt(100);
        }

        //stream API,内部就是for循环,把这些加起来
        System.out.println(Arrays.stream(nums).sum());
    }


    //RecursiveAction是没有返回值的,无法做总和的计算
    /*static class AddTask extends RecursiveAction{

        int start,end;

        AddTask(int s,int e){
            this.start=s;
            this.end=e;
        }

        @Override
        protected void compute() {
            //如果分给某个线程的任务数小于MAX_NUM),就不再进行fork啦
            if (end-start<=MAX_NUM){
                long sum=0L;
                for (int i = start; i < end; i++) {
                    sum+=nums[i];
                }
                System.out.println("from:"+start+" to:"+end+" = "+sum);
            }else{
                int middle=start+(end-start)/2;    //如果大于MAX_VALUE,就切一半

                AddTask subTask1=new AddTask(start,middle);  //类似于递归,递归的过程是ForkJoinPool帮助维护的
                AddTask subTask2=new AddTask(middle,end);   //类似于递归

                subTask1.fork();   //只要一fork,就会有新的线程启动
                subTask2.fork();
            }
        }
    }*/

    static class AddTask extends RecursiveTask<Long>{

        int start,end;

        AddTask(int s,int e){
            this.start=s;
            this.end=e;
        }

        @Override
        protected Long compute() {

            //如果分给某个线程的任务数小于MAX_NUM),就不再进行fork啦
            if (end-start<=MAX_NUM){
                long sum=0L;
                for (int i = start; i < end; i++) {
                    sum+=nums[i];
                }
                return sum;
            }

            int middle=start+(end-start)/2; //如果大于MAX_VALUE,就切一半

            AddTask subTask1=new AddTask(start,middle);  //类似于递归,递归的过程是ForkJoinPool帮助维护的
            AddTask subTask2=new AddTask(middle,end);

            subTask1.fork(); //只要一fork,就会有新的线程启动
            subTask2.fork();


            //RecursiveTask具有返回值
            return subTask1.join()+subTask2.join();

        }
    }

    public static void main(String[] args) throws IOException {
    

        ForkJoinPool fjp=new ForkJoinPool();
        //大任务
        AddTask task=new AddTask(0,nums.length);
        //直接做计算
        fjp.execute(task);

        //也可以直接使用task.join拿到结果,join是阻塞的,不用再System.in.read();
        //long result=task.join();
        //System.out.println(result);

        System.in.read();
    }

}
/*====================output========================
49548337
 */

fork:分叉,join:合并。ForkJoin 和 MapReduce(大数据)有点像。如果有一个非常大的大任务,则可以把这些切分成一个一个的小任务(如果小任务还是太大,则可以继续分,至于分成多小,可以自己指定),分完之后把结果进行合并,产生总的结果。

应用场景:大规模的数据计算

ForkJoinPool里面执行的任务必须是ForkJoinTask,这个任务可以自动进行切分。切分的指定通常继承RecursiveTask(有返回值) or RecursiveAction(无返回值)来进行切分。ForkJoin 的切分类似于递归,递归的过程是ForkJoinPool帮助维护的。

注意:

除了ForkJoinPool,其他线程池都是用的ThreadPoolExecutor这个类,如果想定义自己的线程池,可以使用ThreadPoolExecutor传入相关的参数(7个参数)。
详见:https://www.jianshu.com/p/8dcc81abde4d



参考:马士兵老师java多线程高并发编程

上一篇 下一篇

猜你喜欢

热点阅读