分布式多线程线程池

2019-03-07  本文已影响0人  夜阑人儿未静

说到多线程,概念性东西就不一一赘述了,首先回顾下线程的创建。

Java线程创建的四种方式

1.继承Thread类,重写run方法

static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}

public static void main(String[] args) {
ThreadDemo thread = new ThreadDemo();
thread.setDaemon(true);
thread.setName("thread_demo");
thread.start();
}

2.实现Runnable接口,重写run方法,实现Runnable接口的实现类的实例对象作为Thread构造函数的target

static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}

public static void main(String[] args) {
Thread thread = new Thread(new RunnableDemo());
thread.start();
}

3.通过Callable和FutureTask创建线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callable = new CallableDemo();
FutureTask<Object> futureTask = new FutureTask<>(callable);
new Thread(futureTask)..start();
Object o = futureTask.get();
}

static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}

可以看出Callable与Runable的区别在于Callable带有返回值且可以检测线程是否完成

4.通过线程池创建线程

static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//业务代码......
}
}

static class RunnableDemo implements Runnable{
@Override
public void run() {
//业务代码......
}
}

static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//业务代码......
return null;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);

executorService.execute(new ThreadDemo());

executorService.execute(new RunnableDemo());

FutureTask<Object> futureTask = new FutureTask<>(new CallableDemo());
Future<?> submit = executorService.submit(futureTask);
submit.get();
}

说到线程池,Executor提供了四种线程池

1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

但安装编码规约插件的同学会发现用Executor创建线程池会爆红提示,当然也给出了解释:

image.png

找到源码点进去一探究竟

image.png

newFixedThreadPool除了设置了核心线程数和最大线程数,其他用的都是默认值。

那来了解下ThreadPoolExecutor的核心参数

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

参数设置不当是会出现oom的哦,所以要注意核心参数的默认值

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()

参数设置了,在饱和的情况下ThreadPoolExecutor的处理顺序是什么样子的呢?

最后分享个自己在项目中常用的线程池创建工具类

@Slf4j
public class LocalThreadPool {

public final static String poolName = "thread_pool";

private volatile static LocalThreadPool singletonPool;

private ThreadPoolExecutor executor;

private ThreadPoolExecutor callable;

public static LocalThreadPool getInstance(){
if(singletonPool == null){
synchronized (LocalThreadPool.class){
if(singletonPool == null){
singletonPool = new LocalThreadPool();
}
}
}
return singletonPool;
}

private LocalThreadPool(){

//runnable
final AtomicInteger runnableId = new AtomicInteger(0);

ThreadFactory runableFactory = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {

Thread thread = new Thread(r,"thread_pool_executor_"+runnableId);

thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};

//callable
final AtomicInteger callableId = new AtomicInteger(0);

ThreadFactory callableFactory = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {

Thread thread = new Thread(r,"thread_pool_callable"+callableId);

thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};

executor = new ThreadPoolExecutor(10,20,60,TimeUnit.SECONDS,

new LinkedBlockingQueue<>(20),runableFactory,new RejectedExecutionHandler(){

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run runnable");
}
}
});
callable = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20),
callableFactory, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run callable");
}
}
});
}
public void execute(Runnable r){
executor.execute(r);
}
public <T> Future<T> submit(Callable<T> c){
return callable.submit(c);
}
}

用起来非常之方便

public static void main(String[] args) {
Future<Object> submit = LocalThreadPool.getInstance().submit(new Callable<Object>() {
@Override
public Object call() {
return null;
}
});
LocalThreadPool.getInstance().execute(new Runnable() {
@Override
public void run() {
//业务代码......
}
});
}
上一篇 下一篇

猜你喜欢

热点阅读