【方案设计】任务监控
1.业务场景:
1.1 场景一
餐饮POS打印,多处(客户端)下单,共享一个打印机,因为共享资源等情况,那么可能出现打印失败异常情况,针对这些打印失败任务,我们需要知道,并且自动重新打印。因此需要设计一套监控失败任务体系,打印失败后自动触发监控,执行相应的纠错措施。
1.2 场景二
电商退款场景,财务进行退款流程操作后,需要通知外部系统,同时定时调用银行接口退款,是否退款完成状态需要通知各个系统。 因此需要设计一个后台监控程序,收到各种消息后,通知各个系统。
2.设计思路
(1)首先,监控程序是在后台一直运行(否则无法监控),因此利用多线程启动监控程序,一直运行在后台。
(2)需要有一个队列存放新产生的任务,因为同一时刻可能产生大量任务,系统会来不及处理。
(3)监控程序要一直运行,因此需要设计为用无限循环(while (true)),这个无限循环可以阻塞挂起(没有监控到任务时),如果监控到新产生的任务时能继续执行。
针对第1、2点提到队列,阻塞进程的问题,使用阻塞队列LinkedBlockingQueue存放产生的任务可以完美适配。该队列在线程中是安全的、先进先出、生成消费型首先、阻塞队列。当使用take()方法获取队列信息时,一旦队列为空,则进入阻塞状态,等待新任务,一旦有新任务产生调用put()方法,则阻塞解除,程序继续执行。我们把队列设计为静态全局变量。目前已知有两种可行的方案
3 方案一
业务场景一采用此方案,具体步骤如:
(1)启动采用使用static {},创建后台任务监控任务(一个线程)。
(2)多线程使用scheduledThreadPool来调度执行监控任务,监控任务一直运行在后台。
(3)监控任务使用无限循环结合LinkedBlockingQueue队列
以下是监控pos打印失败,失败任务重新打印的例子:
private static ScheduledExecutorService scheduledThreadPool = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private static boolean ERROR_MONITOR_THREAD_INITED = false;
static {
if (!ERROR_MONITOR_THREAD_INITED) {
scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
PrintTask printTask;
LOGGER.debug("=============Start a error print job process thread!~===========");
try {
while (true) {
//如果队列为空,则阻塞,就一直等待有数据进来(挂起)
printTask = errorJobQueue.take();
PrintTaskManager.doPrint(printTask);
}
} catch (Exception e) {
}
}
}
, 0, 5, TimeUnit.SECONDS);
ERROR_MONITOR_THREAD_INITED = true;
}
}
- 在载入打印任务管理类时,就启动实时监控---static{}。
- 启动后每隔5秒调度执行一个线程,线程任务就是把失败的打印任务重打,用while不停循环从失败任务队列获取任务重打。也就是说一直有一个线程在执行重打错误打印任务 ,为了防止重打任务异常导致整个监控线程结束,因此才有了每隔5秒再次启动监控重打错误任务。
- 所有的错误任务放在一个队列中
4.方案二
业务场景一采用此方案,具体步骤如:
(1)项目启动后开启监听程序,通过实现ApplicationRunner接口来实现,ApplicationRunner具体用法参照我另外一篇文章
(2)创建监听任务,可以有多个,比如退款通知refundNotify,开票通知invoiceNotify、失败后从新发起的通知timingRetryNotify任务等,每监听任务就是一个线程
,然后再启用线程池调度执行各个监听任务,监听任务启动后一直运行在后台
@Override
public void run(ApplicationArguments args)
{
String url01 = url + "/erp2ld_refund_status";
String url02 = url + "/erp2ld_inv_status";
RefundNotify refundNotify = new RefundNotify(refundQueue, mRefundNotifyMapper, url01);
InvoiceNotify invoiceNotify = new InvoiceNotify(invoiceQueue, mApplicationNotifyMapper, url02);
TimingRetryNotify timingRetryNotify = new TimingRetryNotify(timingRetryQueue, mRefundNotifyMapper, mApplicationFormMapper,
mApplicationNotifyMapper, url01, url02);
//获取系统处理器个数,作为线程池数量
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("member-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(refundNotify);
pool.execute(invoiceNotify);
pool.execute(timingRetryNotify);
}
(3)监听任务是一个线程,使用无限循环结合LinkedBlockingQueue队列
如果有新任务产生,任务标识放入队列即可,挂起的线程继续执行任务。
@SneakyThrows
@Override
public void run()
{
while (true)
{
List<com.andy.modules.erp.member.pojo.RefundNotify> list = mRefundNotifyMapper.selectToNotify();
if (list.size() > 0)
{
for (com.andy.modules.erp.member.pojo.RefundNotify mRefundNotify : list)
{
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(3000);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(15000);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.execute(
(RetryCallback<Object, Throwable>) retryContext -> {
refundNotify(mRefundNotify);
return "通知成功";
},
retryContext -> "通知失败");
}
}
// 如果没有通知,则阻塞
logger.info(queue.take());
}
}
(4)如果任务执行失败,比如退款通知失败(调用外部系统),无法收到触发任务,因此需要定时任务每隔一段时间把退款通知失败的任务重新发起。
@Component("MNotifyTask")
public class NotifyTask implements ITask {
private static Logger logger = LoggerFactory.getLogger(AutoRepayServiceImpl.class);
@Override
public void run(String params) throws InterruptedException {
//定时推送数据到LD
logger.info("定时推送数据到LD");
StartService.timingRetryQueue.put("定时推送数据到LD");
}
}