【并发编程技术】线程池监控技术分析操作
假如现在有好几个业务子系统共同完成一个任务,当大量的任务来临时,怎么去保证每个任务都能跑完?这就需要我们对任务进行监控,我的思路是这样的:创建一个新的监控项目,定时遍历数据库的任务表,看看有没有新来的任务,有的话取出来,加入一个待执行队列,并计算超时时间,再开一个线程去判断该任务是否跑完,若超时了,则触发重发机制,若跑完了,则关闭线程。
这里有三个问题要特别注意
- 监控线程不得对原有业务造成侵入,必须是额外的。
- 一定要保证监控到每个任务,可以统一线程监控全部,也可以每个任务开一个线程。
- 超时时间的计算(由于任务多,会堆在队列中,所以每个任务的超时时间都不同)
实施这个监控的步骤
- 在业务中,每完成一个子业务,就把下一个环节的路径和数据存储,这里用kafka做消息传递,因此保存队列、数据和状态即可。
- 在监控项目中,超时时间的计算方法
超时时间 = 任务执行预算时间 * 队列个数(队列必须用先进先出策略)
超时类
public class CmdRecordMonitor {
//起始时间
private Long startTime;
//终止时间
private Long overTime;
//重发次数限制
private int refairTime;
...
setter and getter
}
超时队列(LinkedBlockingQueue)
监听任务是否超时
for(String cmdId : cmdIds) {
if(!CmdQueue.PRE_CMD_QUEUE.contains(cmdId)){
CmdQueue.PRE_CMD_QUEUE.add(cmdId);
...
//超时时间
Integer overtime = jsArray.size() * CmdQueue.PRE_CMD_QUEUE.size()
* 2000;
Long overtime2 = overtime.longValue();
cmdRecordMonitor.setOverTime(overtime2);
//保存监控任务
CmdMonitor cmdMonitor = new CmdMonitor();
CmdMonitor cmdMonitor2 = monitorService.getCmdMonitor(cmdId);
if(cmdMonitor2 == null) {
cmdMonitor.setCmdId(cmdInfo.getCmdId());
cmdMonitor.setCmdType(cmdInfo.getCmdType());
cmdMonitor.setCreateTime(new Date());
cmdMonitor.setStatus(1);
monitorService.saveCmdMonitor(cmdMonitor);
}else{
cmdMonitor = cmdMonitor2;
}
//启动线程监控该程序
...
}
}
判断超时和重发,只给个大体思路,具体业务代码我剔除掉了
@Override
public void run() {
logger.info("--------------------center-monitor:启动任务监控线程--------------------");
boolean flg = false;
//让线程一直循环,除非程序崩溃
while(!flg){
try{
Long nowtime = System.currentTimeMillis();
Long last = cmdRecordMonitor.getStartTime();
//先判断是否完成了
Date cmdTime = ...
if(cmdTime != null) {
logger.info("--------------------center-monitor:任务按时完成,
关闭监控线程--------------------");
CmdQueue.PRE_CMD_QUEUE.remove(cmdId);
flg=true;
return;
}
//超时,从队列里面踢出
if(nowtime - last > cmdRecordMonitor.getOverTime() && status < 5) {
logger.info("----------------center-monitor:超时了---------------");
//记录异常,通知异常处理
//判断状态在哪一步
if(cmdInfo.getStatus() == 1) {
//清除数据
//重新发任务
}
else if(cmdInfo.getStatus() == 2) {
}
else if(cmdInfo.getStatus() == 3) {
}
//再次加入监控
CmdQueue.PRE_CMD_QUEUE.remove(cmdId);
flg=true;
...
}
}catch(Exception e){
e.printStackTrace();
}
}
}