一个多线程
2022-01-25 本文已影响0人
你家门口的两朵云
依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
写一个工具类
package edu.hgnu.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.*;
/**
* 任务数量不要大于2048
*/
public class Paraller {
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("hbd-core-pool-%d").build();
private static Semaphore globalSemaphore = new Semaphore(2048);
//common Thread Pool
private static volatile ExecutorService pool = null;
static {
int corePoolSize = Runtime.getRuntime().availableProcessors();
//使用SynchronousQueue 每次加入任务后立即从线程池中取一个线程或新增一个线程 , 线程过期策略15秒
pool = new ThreadPoolExecutor(corePoolSize, 1024 * 20,
5L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
/**
* 大部分情况下不需要调用
*/
public static void shutDown() {
if (pool != null) {
pool.shutdown();
}
}
/**
* 提交一个任务到线程池中,通过该方法提交的并发任务数量超过1024*2则该方法会阻塞
*/
public static void execute(ParallerAction action) {
try {
globalSemaphore.acquire();
pool.execute(() -> {
try {
action.action();
} catch (Throwable ex) {
ex.printStackTrace();
} finally {
globalSemaphore.release();
}
});
} catch (Throwable ex) {
ex.printStackTrace();
}
}
public static void forEach(final List<ParallerAction> actions) {
forEach(actions, null);
}
public static void forEach(final List<ParallerAction> actions, ParallelOptions parallelOptions) {
if (actions == null || actions.size() < 1) {
return;
}
int corePoolSize = Runtime.getRuntime().availableProcessors();
int threadCount = 0;
if (parallelOptions == null || parallelOptions.threadCount <= 0) {
threadCount = corePoolSize;
} else {
threadCount = parallelOptions.threadCount;
}
if (threadCount > 1024) {
threadCount = 1024;
}
final CountDownLatch latch = new CountDownLatch(actions.size());
//限制入池流量
final Semaphore semaphore = new Semaphore(threadCount);
for (final ParallerAction item : actions) {
try {
//获取到线程许可才可以执行,否则堵塞,执行一次释放一个栅格
semaphore.acquire();
pool.execute(() -> {
try {
item.action();
} catch (Throwable ex) {
ex.printStackTrace();
} finally {
semaphore.release();
latch.countDown();
}
});
} catch (Throwable ex) {
ex.printStackTrace();
}
}
try {
latch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static class ParallelOptions {
public ParallelOptions() {
}
public ParallelOptions(int threadCount) {
this.threadCount = threadCount;
}
public int getThreadCount() {
return threadCount;
}
public void setThreadCount(int threadCount) {
this.threadCount = threadCount;
}
/**
* 并发的线程数量
*/
private int threadCount;
}
public interface ParallerAction {
void action();
}
}
使用多线程执行任务
public void mulThreadRun(){
ArrayList<Emp> list = new ArrayList<>();
list.add(new Emp().setEmpId("444").setName("李六"));
list.add(new Emp().setEmpId("111").setName("张三"));
list.add(new Emp().setEmpId("222").setName("Mr z"));
list.add(new Emp().setEmpId("111").setName("张四"));
list.add(new Emp().setEmpId("222").setName("Mr c"));
list.add(new Emp().setEmpId("333").setName("Korean"));
list.add(new Emp().setEmpId("111").setName("张五"));
list.add(new Emp().setEmpId("333").setName("3060 TI"));
list.add(new Emp().setEmpId("111").setName("张六"));
list.add(new Emp().setEmpId("333").setName("2060 TI"));
list.add(new Emp().setEmpId("444").setName("李四"));
list.add(new Emp().setEmpId("333").setName("1080 TI"));
list.add(new Emp().setEmpId("444").setName("李五"));
System.out.println(list);
Map<String, List<Emp>> listMap = list.parallelStream().collect(Collectors.groupingBy(Emp::getEmpId));
System.out.println(listMap);
// 分组执行
List<Paraller.ParallerAction> actions = new ArrayList<>(listMap.size());
listMap.forEach((key, value) -> {
actions.add(new Action(key, value));
});
// 遍历调用执行
Paraller.forEach(actions, null);
}
/**
* 每个线程的具体执行
*/
private class Action implements Paraller.ParallerAction {
String keyF;
List<Emp> listF;
// 构造方法,将需要的数据传进来;
public Action(String key,List<Emp> list) {
this.keyF = key;
this.listF = list;
}
// 实际执行动作
@Override
public void action() {
System.out.println(keyF+"-------"+listF.toString());
Paraller.shutDown();
}
}
测试
public class TestMulThread {
public static void main(String[] args) {
MulThread mulThread = new MulThread();
mulThread.mulThreadRun();
}
}