多线程使用轮子
2020-10-21 本文已影响0人
帮我的鸵鸟盖个章
构造一个对象,获取这个对象的list,然后循环去执行。
自定义线程池仓库
import java.util.concurrent.ExecutorService;
/**
* 自定义线程池仓库
*/
public final class CustomPoolStore {
private static int otherPool2Size=15;//同步erp默认线程数
private static int otherPoolSize=30;//同步other默认线程数
private CustomPoolStore(){};
/**
* 获取OtherPool2线程池
* @return
*/
public static final ExecutorService getOtherPool2(){
return OtherPool2.get();
}
/**
* 获取other线程池
* @return
*/
public static final ExecutorService getOtherPool(){
return OtherPool.get();
}
private static class OtherPool2{
private static ExecutorService pool=ExecutorUtils.create(otherPool2Size);
public static ExecutorService get(){
return pool;
}
}
/**
* other线程池
*/
private static class OtherPool{
private static ExecutorService pool=ExecutorUtils.create(otherPoolSize);
public static ExecutorService get(){
return pool;
}
}
}
异步请求参数基类
import com.*.common.utils.Feedback;
import lombok.Data;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.Serializable;
/**
* 异步请求参数基类
*/
@SuppressWarnings({ "serial", "rawtypes" })
@Data
public abstract class AbstractAsynDto<S,D extends AbstractAsynDto> implements Serializable{
private S service;
public abstract Feedback run();
@SuppressWarnings("unchecked")
public D service(S service){
this.setService(service);
return (D) this;
}
public Feedback execute(){
Feedback result=null;
try {
result=this.run();
} catch (Exception e) {
result=Feedback.faild("777777",e.getMessage(),ExceptionUtils.getStackTrace(e));
}
return result;
}
}
线程池工具类
import lombok.Data;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
/**
* 线程池工具类
*/
public final class ExecutorUtils {
/**
* 默认配置
*/
@Data
public static class ThreadPoolConfig{
private int corePoolSize=10;//核心线程数
private int maximumPoolSize=10;//最大线程数
private long keepAliveTime=60;//存活时间
private TimeUnit unit=TimeUnit.SECONDS;//存活时间单位
private BlockingQueue<Runnable> workQueue=new LinkedBlockingQueue<>();//默认工作队列数量,无上限
private ThreadFactory threadFactory=Executors.defaultThreadFactory();//默认线程
private RejectedExecutionHandler handler=new AbortPolicy();//默认抛出一场
}
/**
* 创建默认线程池
* @param corePoolSize
* @return
*/
public static ExecutorService create(int corePoolSize){
ThreadPoolConfig config=new ThreadPoolConfig();
config.setCorePoolSize(corePoolSize);
config.setMaximumPoolSize(corePoolSize);
return create(config);
}
/**
* 自定义配置参数
* @param config
* @return
*/
public static ExecutorService create(ThreadPoolConfig config){
return new java.util.concurrent.ThreadPoolExecutor(config.getCorePoolSize(),
config.getMaximumPoolSize(),config.getKeepAliveTime(),config.getUnit(),
config.getWorkQueue(),config.getThreadFactory(),config.getHandler());
}
}
使用
建UserDto
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
@SuppressWarnings("serial")
@Builder
@Data
@EqualsAndHashCode(callSuper=true)
public class UserDto extends AbstractAsynDto<IUserService, UserDto> {
private String Name;
private Boolean isMan;
@Override
public Feedback run() {
return this.getService().updateUser(this);
}
}
建IUserService
public interface IUserService {
public Feedback updateUser(UserDto re);
}
建UserServiceImpl
@Service
public class UserServiceImpl implements IUserService{
@Transactional
@Override
// @SuppressWarnings("unchecked")
public Feedback updateUser(UserDto re) {
//执行代码
return Feedback.succeed();
}
}
具体调用
@Autowired
private IUserService userService;
List<UserDto> userDtoList = new ArrayList<>();
userDtoList.add(userDto1);
Map<String,Future<Feedback>> exeResult = new HashMap<>();
userDtoList.forEach(l-> exeResult.put(l.getName(),CustomPoolStore.getOtherPool().submit(l.service(this.userService)::execute)));
需要注意的是事务,由于线程绑定的原因,具体调用
的类不能和UserServiceImpl
在同一个类,不然事务不起作用