【方案设计】任务监控

2021-11-12  本文已影响0人  嘻洋洋

1.业务场景:

1.1 场景一

餐饮POS打印,多处(客户端)下单,共享一个打印机,因为共享资源等情况,那么可能出现打印失败异常情况,针对这些打印失败任务,我们需要知道,并且自动重新打印。因此需要设计一套监控失败任务体系,打印失败后自动触发监控,执行相应的纠错措施。

1.2 场景二

电商退款场景,财务进行退款流程操作后,需要通知外部系统,同时定时调用银行接口退款,是否退款完成状态需要通知各个系统。 因此需要设计一个后台监控程序,收到各种消息后,通知各个系统。

2.设计思路

(1)首先,监控程序是在后台一直运行(否则无法监控),因此利用多线程启动监控程序,一直运行在后台。
(2)需要有一个队列存放新产生的任务,因为同一时刻可能产生大量任务,系统会来不及处理。
(3)监控程序要一直运行,因此需要设计为用无限循环(while (true)),这个无限循环可以阻塞挂起(没有监控到任务时),如果监控到新产生的任务时能继续执行。

针对第1、2点提到队列,阻塞进程的问题,使用阻塞队列LinkedBlockingQueue存放产生的任务可以完美适配。该队列在线程中是安全的、先进先出、生成消费型首先、阻塞队列。当使用take()方法获取队列信息时,一旦队列为空,则进入阻塞状态,等待新任务,一旦有新任务产生调用put()方法,则阻塞解除,程序继续执行。我们把队列设计为静态全局变量。目前已知有两种可行的方案

3 方案一

业务场景一采用此方案,具体步骤如:
(1)启动采用使用static {},创建后台任务监控任务(一个线程)。
(2)多线程使用scheduledThreadPool来调度执行监控任务,监控任务一直运行在后台。
(3)监控任务使用无限循环结合LinkedBlockingQueue队列

以下是监控pos打印失败,失败任务重新打印的例子:

    private static ScheduledExecutorService scheduledThreadPool = Executors
           .newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
     private static boolean ERROR_MONITOR_THREAD_INITED = false;
     static {
       if (!ERROR_MONITOR_THREAD_INITED) {
           scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
               @Override
               public void run() {
                   PrintTask printTask;
                   LOGGER.debug("=============Start a error print job process thread!~===========");
                   try {
                       while (true) {
                         //如果队列为空,则阻塞,就一直等待有数据进来(挂起)
                           printTask = errorJobQueue.take();
                           PrintTaskManager.doPrint(printTask);
                       }
                   } catch (Exception e) {
                   }
               }
           }
           , 0, 5, TimeUnit.SECONDS);
           ERROR_MONITOR_THREAD_INITED = true;
       }
   }

4.方案二

业务场景一采用此方案,具体步骤如:
(1)项目启动后开启监听程序,通过实现ApplicationRunner接口来实现,ApplicationRunner具体用法参照我另外一篇文章
(2)创建监听任务,可以有多个,比如退款通知refundNotify,开票通知invoiceNotify、失败后从新发起的通知timingRetryNotify任务等,每监听任务就是一个线程,然后再启用线程池调度执行各个监听任务,监听任务启动后一直运行在后台

    @Override
    public void run(ApplicationArguments args)
    {
        String url01 = url + "/erp2ld_refund_status";
        String url02 = url + "/erp2ld_inv_status";
        RefundNotify refundNotify = new RefundNotify(refundQueue, mRefundNotifyMapper, url01);
        InvoiceNotify invoiceNotify = new InvoiceNotify(invoiceQueue, mApplicationNotifyMapper, url02);
        TimingRetryNotify timingRetryNotify = new TimingRetryNotify(timingRetryQueue, mRefundNotifyMapper, mApplicationFormMapper,
                mApplicationNotifyMapper, url01, url02);
        //获取系统处理器个数,作为线程池数量
        int nThreads = Runtime.getRuntime().availableProcessors();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("member-pool-%d").build();
        //Common Thread Pool
        ExecutorService pool = new ThreadPoolExecutor(5, 200,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        pool.execute(refundNotify);
        pool.execute(invoiceNotify);
        pool.execute(timingRetryNotify);
    }

(3)监听任务是一个线程,使用无限循环结合LinkedBlockingQueue队列
如果有新任务产生,任务标识放入队列即可,挂起的线程继续执行任务。

    @SneakyThrows
    @Override
    public void run()
    {
        while (true)
        {
            List<com.andy.modules.erp.member.pojo.RefundNotify> list = mRefundNotifyMapper.selectToNotify();
            if (list.size() > 0)
            {
                for (com.andy.modules.erp.member.pojo.RefundNotify mRefundNotify : list)
                {
                    RetryTemplate retryTemplate = new RetryTemplate();
                    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
                    backOffPolicy.setInitialInterval(3000);
                    backOffPolicy.setMultiplier(2);
                    backOffPolicy.setMaxInterval(15000);
                    retryTemplate.setBackOffPolicy(backOffPolicy);
                    retryTemplate.execute(
                            (RetryCallback<Object, Throwable>) retryContext -> {
                                refundNotify(mRefundNotify);
                                return "通知成功";
                            },
                            retryContext -> "通知失败");
                }
            }
          // 如果没有通知,则阻塞
            logger.info(queue.take());
        }
    }

(4)如果任务执行失败,比如退款通知失败(调用外部系统),无法收到触发任务,因此需要定时任务每隔一段时间把退款通知失败的任务重新发起。

@Component("MNotifyTask")
public class NotifyTask implements ITask {
    private static Logger logger = LoggerFactory.getLogger(AutoRepayServiceImpl.class);
    @Override
    public void run(String params) throws InterruptedException {
        //定时推送数据到LD
        logger.info("定时推送数据到LD");
        StartService.timingRetryQueue.put("定时推送数据到LD");
    }
}
上一篇下一篇

猜你喜欢

热点阅读