Java:线程池Executors.newFixedThread
摘要:Java
,多线程
,线程池
多线程编程和线程池概述
(1)多线程程序:
计算机可以实现多任务 ( multitasking ),在同一刻运行多个程序,多线程程序在较低的层次上扩展了多任务的概念,即一个程序同时执行多个任务,每一个任务称为一个线程 ( thread ) 。可以同时运行一个以上线程的程序称为多线程程序 ( multithreaded ) 。和多进程相比,多线程之间是共享变量的,这使得线程之间的通信比进程之间的通信更有效更容易,另外线程更轻量级, 创建撤销一个线程比启动新进程的开销要小得多
(2)线程池
构建一个新的线程是有一定代价的 , 因为涉及与操作系统的交互 。 如果程序中创建了大量的生命期很短的线程 , 应该使用线程池 (thread pool) 。 一个线程池中包含许多准备运行的空闲线程,线程执行完任务后不会死亡, 而是在池中准备为下一个请求提供服务,另一个使用线程池的理由是减少并发线程的数目,创建大量线程会大大降低性能甚至使机器崩溃 ,提交的任务数多于空闲的线程数, 那么把得不到服务的任务放置到队列中,总结线程池的出现的目的:
- 减少频繁创建撤销线程导致的性能损耗
- 限制并发数量,防止线程过多机器崩溃
Java线程池Executors.newFixedThreadPool简单使用
实现线程池的一种常用方法是调用Executors.newFixedThreadPool静态方法,execute() 没有返回值;而 submit() 有返回值
public static void main(String[] args) {
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
for (int i = 1; i < 20; i++) {
final int finalI = i;
executorService1.execute(() -> {
try {
System.out.println(finalI);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService1.shutdown();
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
for (int i = 1; i < 10; i++) {
int finalI = i;
Future<?> sub = executorService1.submit(() -> {
try {
System.out.println(finalI);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(sub.get());
}
executorService1.shutdown();
}
Java线程池工程案例
写一个完成的Java多线程任务,分为6个步骤
- 获取任务列表:读取任务列表,每各类表元素基于JSONObject进行加工
- 创建线程池:使用
Executors.newFixedThreadPool
创建线程池,指定线程数
- 多线程任务启动:使用线程池执
execute
方法启动多线程任务 - 多线程任务过程控制:使用
java.util.concurrent.atomic.AtomicBoolean
,java.util.concurrent.atomic.AtomicInteger
记录多线程任务执行中的状态,如果状态失败则直接退出线程池,同时控制主线程打印日志 - 任务结束:数据处理完毕,比如对数据简单加工过滤最后保存到本地
- 线程池关闭:调用线程池执行器
ExecutorService
的shutdownNow
,awaitTermination
方法关闭线程池
package com.example.Multithreading;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.fastjson.JSONObject;
import com.example.Multithreading.utils.Config;
import com.example.Multithreading.utils.FileUtils;
import com.example.Multithreading.utils.SolrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Config.class);
public static void main(String[] args) {
List<JSONObject> data = FileUtils.readDate();
LOGGER.info("初始数据量{}", data.size());
long startTime = System.currentTimeMillis();
ExecutorService threadPoolExecutor = null;
AtomicInteger finishBatch = new AtomicInteger(0);
AtomicBoolean error = new AtomicBoolean(false);
try {
// 线程池配置
int workCount = Integer.parseInt(Config.getString("executorWorkCount", "5"));
threadPoolExecutor = Executors.newFixedThreadPool(workCount);
List<JSONObject> errorList = new ArrayList<>();
for (JSONObject jsonObject : data) {
threadPoolExecutor.execute(() -> {
try {
String entName = jsonObject.getString("entName");
long entDocumentCount = SolrUtils.getEntDocumentCount(entName);
if (entDocumentCount != 0) {
String recent = SolrUtils.getEntRecentDate(entName);
if (recent != null) {
jsonObject.put("count", entDocumentCount);
jsonObject.put("recent", recent);
} else {
errorList.add(jsonObject);
}
} else {
errorList.add(jsonObject);
}
} catch (Exception e) {
error.set(true);
e.printStackTrace();
} finally {
finishBatch.getAndIncrement();
}
});
}
// 等待所有查询完
while (finishBatch.get() != data.size()) {
TimeUnit.MILLISECONDS.sleep(1000);
LOGGER.info("完成" + finishBatch.get() * 100 / data.size() + "%");
if (error.get()) {
LOGGER.error("执行线程发生异常退出!");
System.exit(1);
}
}
if (error.get()) {
LOGGER.error("执行线程发生异常退出!");
System.exit(1);
}
// 过滤
if (!errorList.isEmpty()) {
data.removeAll(errorList);
}
LOGGER.info("过滤后数据量{}", data.size());
// 结果写入本地
FileUtils.writeDate(data);
LOGGER.info("写入本地完成,任务结束");
long endTime = System.currentTimeMillis();
LOGGER.info("耗时{}秒", (endTime - startTime) / 1000); // 耗时371秒
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != threadPoolExecutor) {
threadPoolExecutor.shutdownNow();
try {
while (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOGGER.info("线程池还未关闭");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输出日志如下
2021-08-24 18:26:29 INFO Config:61 - 完成99%
2021-08-24 18:26:30 INFO Config:61 - 完成99%
2021-08-24 18:26:31 INFO Config:61 - 完成100%
2021-08-24 18:26:31 INFO Config:75 - 过滤后数据量371
2021-08-24 18:26:31 INFO Config:78 - 写入本地完成,任务结束
2021-08-24 18:26:31 INFO Config:80 - 耗时412秒
配置线程池的大小
一般需要根据任务的类型来配置线程池大小:
- 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
- 如果是IO密集型任务,参考值可以设置为 2 * NCPU
在《Java并发编程实践》中,是这样来计算线程池的线程数目的:
给定下列定义:
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1,期望Ucpu等于1
W/C = 等待时间与计算时间的比率,如果以IO密集为主,W/C接近1,如果以计算密集为主,W/C接近0
为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads = Ncpu x Ucpu x (1 + W/C)
如果以计算密集为主,W/C等于0,假设目标CPU使用率为1,则Nthreads=Ncpu,+1的目的是防止恰好在某时因为发生一个页错误或者因其他原因而暂停
,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作
如果以IO密集为主,W/C等于1,假设目标CPU使用率为1,则Nthreads=2Ncpu
结论:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程
参考:https://cloud.tencent.com/developer/article/1806245