线程池

2022-12-16  本文已影响0人  arkliu

ThreadPoolExecutor参数解释

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

线程池的执行流程

image.png

ThreadPoolExecutor demo

package com.test.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExePollTest {

    public static void main(String[] args) {
        // 自定义线程池
        // 最大线程该如何定义:
        // 1. cpu密集型:cpu几核就是几,可以保持cpu效率最高
        System.out.println(Runtime.getRuntime().availableProcessors());
        // 2. io密集型:判断程序中,十分消耗io的线程的两倍
        ExecutorService service = new ThreadPoolExecutor(
                    2,
                    5,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardOldestPolicy()
                );
        try {
            // 最大承载线程数:队列大小+最大线程数量
            // AbortPolicy策略:超过RejectedExecutionException
            // CallerRunsPolicy策略: 超过最大承载线程数的部分,由调用该线程的线程执行
            // DiscardOldestPolicy策略: 超过最大承载线程数,不会抛出异常,丢掉任务
            // DiscardOldestPolicy策略: 超过最大承载线程数,尝试和最早的线程竞争,不会抛出异常
            for (int i = 1; i <= 9 ; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                service.execute(new Runnable() {
                    
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName()+"  ok");
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            service.shutdown();
        }
    }
}

java内置线程池

执行ExecutorService

execute(Runnable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
fixedExecutor.execute(new MyRunnable(1));
fixedExecutor.shutdown();

submit(Runnable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future future = fixedExecutor.submit(new MyRunnable(1));
try {
    //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
    System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();

submit(Callable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future<String> future = fixedExecutor.submit(new Callable<String>() {

    @Override
    public String call() throws Exception {
        return "hello world..";
    }
});
try {
    //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
    System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();
image.png

invokeAny

invokeAny会返回所有Callable任务中第一个得到执行完的Callable的结果作为返回值

// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("first runs..");
        return "first";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("second runs..");
        return "second";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("third runs..");
        return "third";
    }
});
String future = null;
try {
    //3. 执行invokeAny,会返回上面listCallable的任意一个Callable的返回值
    future = fixedExecutor.invokeAny(listCallable);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
System.out.println(future);
fixedExecutor.shutdown();
image.png

invokeAll

invokeAll会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象

// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("first runs..");
        return "first";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("second runs..");
        return "second";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("third runs..");
        return "third";
    }
});
List<Future<String>> futures = null;
try {
    futures = fixedExecutor.invokeAll(listCallable);
    
    for (Future<String> future : futures) {
        System.out.println("result:"+future.get());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();
image.png

newCachedThreadPool

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。看下面栗子:

新建一个MyRunnable

class MyRunnable implements Runnable {
    private int id;
    
    public MyRunnable(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" runs..."+id);
    }
    
    @Override
    public String toString() {
        return ""+id;
    }
}

测试

public static void testCachedThreadPool01() {
        //1. 使用工程类获取线程池对象
        ExecutorService cacheExecutor = Executors.newCachedThreadPool();
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png
// 使用工厂类获取线程池对象
    public static void testCachedThreadPool02() {
        ExecutorService cacheExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

newFixedThreadPool

最多n个线程将处于活动状态。如果提交了n个以上的线程,那么它们将保持在队列中,直到线程可用

public static void testnewFixedThreadPool01() {
        // 最多创建3个线程
        ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            fixedExecutor.submit(new MyRunnable(i));
        }
    }
image.png

工厂类获取线程池对象

// 使用工厂类获取线程池对象
    public static void testnewFixedThreadPool02() {
        ExecutorService cacheExecutor = Executors.newFixedThreadPool(3, new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

newSingleThreadExecutor

创建一个核心线程,并且最大线程也是1个,同时它具有一个无边界的阻塞队列LinkedBlockingQueue

public static void testnewSingleThreadExecutor01() {
        ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            fixedExecutor.submit(new MyRunnable(i));
        }
    }
image.png
// 使用工厂类获取线程池对象
    public static void testnewSingleThreadExecutor02() {
        ExecutorService cacheExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

shutdown方法

shutdown将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断,并抛出RejectedExecutionException异常

ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
singleExecutor.execute(()->{
    System.out.println("thread1 runs..");
});
singleExecutor.shutdown();
// 上面已经shutdown了,后续加入进来执行的线程,将抛出RejectedExecutionException
singleExecutor.execute(()->{
    System.out.println("thread2 runs..");
});
image.png

shutdownNow

shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回

ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 10; i++) {
    singleExecutor.submit(new MyRunnable(i));
}
List<Runnable>list = singleExecutor.shutdownNow();
for (Runnable runnable : list) {
    System.out.println("线程:"+runnable+" 被取消了");
}
image.png

延迟或定期执行任务

延迟两秒执行任务

ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
        scheduledExecutor.schedule(()->{
            System.out.println("scheduledExecutor runs..");
        }, 2, TimeUnit.SECONDS);

间隔执行

ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3, new ThreadFactory() {
    int n = 1;
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "线程:"+(n++));
    }
});
scheduledExecutor.scheduleAtFixedRate(new MyRunnable(1), 2, 2, TimeUnit.SECONDS);
2.gif
上一篇 下一篇

猜你喜欢

热点阅读