风样的程序员

Timer与ScheduledExecutorService异常

2018-06-11  本文已影响30人  goodchax

足以让你软件稳定运行,躺着把钱赚了的细节!
去年我的小团伙,帮一家公司开发一个终端推送服务系统。在刚上线不久出现了稳定性问题,716台终端数据不推送(如果无法解决,将对该公司造成716*2000的经济损失),经过几轮测试,发现问题很有可能在于定时器部分,然后我们阅读大量文献,基本上定位到问题在于定时器遇到exception的关闭与挂起。

Timer的缺陷

Timer被设计成支持多个定时任务,通过源码发现它有一个任务队列用来存放这些定时任务,并且启动了一个线程来处理
通过这种单线程的方式实现,在存在多个定时任务的时候便会存在问题:若任务B执行时间过长,将导致任务A延迟了启动时间!
还存在另外一个问题,应该是属于设计的问题:若任务线程在执行队列中某个任务时,该任务抛出异常,将导致线程因跳出循环体而终止,即Timer停止了工作!
同样是举个栗子:

public static void main(String[] args) {  
      
    Timer timer = new Timer();  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " A: do task");  
        }  
    }, 0, 5*1000);  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " B: sleep");  
            try {  
                Thread.sleep(20*1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }, 10*1000, 5000);  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " C: throw Exception");  
            throw new RuntimeException("test");  
        }  
    }, 30*1000, 5000);  
}  

通过以上程序发现:一开始,任务A能正常每隔5秒运行一次。在任务B启动后,由于任务B运行时间需要20秒,导致任务A要等到任务B执行完才能执行。更可怕的是,任务C启动后,抛了个异常,定时任务挂了!
不过这种单线程的实现也有优点:线程安全!

ScheduledThreadPoolExecutor简介

ScheduledThreadPoolExecutor可以说是Timer的多线程实现版本,连JDK官方都推荐使用ScheduledThreadPoolExecutor替代Timer。它是接口ScheduledExecutorService的子类
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以本质上说ScheduledThreadPoolExecutor还是一个线程池(可参考《Java线程池ThreadPoolExecutor简介》)。它也有coorPoolSize和workQueue,接受Runnable的子类作为任务。

特殊的地方在于它实现了自己的工作队列DelayedWorkQueue,该任务队列的作用是按照一定顺序对队列中的任务进行排序。比如,按照距离下次执行时间的长短的升序方式排列,让需要尽快执行的任务排在队首,“不那么着急”的任务排在队列后方,从而方便线程获取到“应该”被执行的任务。除此之外,ScheduledThreadPoolExecutor还在任务执行结束后,计算出下次执行的时间,重新放到工作队列中,等待下次调用。

上面通过一个程序说明了Timer存在的问题!这里我将Timer换成了用ScheduledThreadPoolExecutor来实现,注意TimerTask也是Runnable的子类。

public static void main(String[] args) {  
    int corePoolSize = 3;  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize);    
         
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " A: do task");  
        }  
    }, 0 ,5, TimeUnit.SECONDS);    
      
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " B: sleep");  
            try {  
                Thread.sleep(20*1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }, 10, 5, TimeUnit.SECONDS);  
      
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " C: throw Exception");  
            throw new RuntimeException("test");  
        }  
    }, 30, 5, TimeUnit.SECONDS);  
}  

由于有3个任务需要调度,因此我将corePoolSize设置为3。通过控制台打印可以看到这次任务A一直都在正常运行(任务时间间隔为5秒),并不受任务B的影响。任务C抛出异常后,虽然本身停止了调度,但没有影响到其他任务的调度。可以说ScheduledThreadPoolExecutor解决Timer存在的问题!
那要是将corePoolSize设置为1,变成单线程跑呢?结果当然是和Timer一样,任务B会导致任务A延迟执行,不过比较好的是任务C抛异常不会影响到其他任务的调度。

可以说ScheduledThreadPoolExecutor适用于大部分场景,甚至就算timer提供的Date参数类型的开始时间也可以通过自己转的方式来实现。任务调度框架Quatz也是在ScheduledThreadPoolExecutor基础上实现的。

一般我们都使用单线程版的ScheduledThreadPoolExecutor居多,推荐通过以下方式来构建(构建后其线程数就不可更改)

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();  
Timer异常后,任务就终止

如果Timer报错的时候还要继续执行任务,解决方法:在异常处理中加代码。

另一种解决办法:java.util.concurrent.ScheduledExecutorService;

ScheduledExecutorService异常后,任务会被挂起,解决方法:在异常处理中加代码。

public static ScheduledExecutorService executorService = Executors  
        .newScheduledThreadPool(1);  
  
/** 
 * 1分钟执行一次 
 */  
public static void runTimer() {  
    executorService.scheduleAtFixedRate(new Runnable() {  
        @Override  
        public void run() {  
            try {  
                .....               
            } catch (Exception e) {  
                e.printStackTrace();  
                ......
            }  
        }  
    }, 0, 60, TimeUnit.SECONDS);  
}  
RocketMQ源码分析:
public void start() {
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            if (timeDelay != null) {
                this.timer.schedule(new DeliverScanJobTimerTask(level), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
//                    log.info("scheduleAtFixedRate");
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 0, 2000);
    }

    public synchronized void persist() {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            log.error("persist file [{}] exception", e);
        }
    }

    public void shutdown() {
        log.info("Shutdown");
        this.timer.cancel();
    }

    public boolean parseDelayLevel() {
        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(32);
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);

        String levelString = this.store.getBrokerConfig().getMessageDelayLevel();
        try {
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                Long tu = timeUnitTable.get(ch);

                int level = i + 1;
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                long delayTimeMillis = tu * num;
                this.delayLevelTable.put(level, delayTimeMillis);
            }
        } catch (Exception e) {
            log.error("parseDelayLevel exception", e);
            log.info("levelString String = {}", levelString);
            return false;
        }

        return true;
    }

    public Long getLevelDelayTime(int delayLevel) {
        return delayLevelTable.get(delayLevel);
    }

    class DeliverScanJobTimerTask extends TimerTask {

        private final int delayLevel;

        public DeliverScanJobTimerTask(int delayLevel) {
            this.delayLevel = delayLevel;
        }

        @Override
        public void run() {
            try {
                //
                TarminalManager tarminalManager = ScheduleScanJobService.this.store.getTarminalManager();
                List<TarminalPO> tarminals = tarminalManager.scan(delayLevel);
                for (TarminalPO tarminalPO : tarminals) {
                    this.executeOnTimeup(tarminalPO);
                }

            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleScanJobService.this.timer.schedule(new DeliverScanJobTimerTask(
                        this.delayLevel), DELAY_FOR_A_PERIOD);
            } finally {
                ScheduleScanJobService.this.timer.schedule(
                        new DeliverScanJobTimerTask(this.delayLevel), ScheduleScanJobService.this.getLevelDelayTime(delayLevel));
            }
        }

        public void executeOnTimeup(final TarminalPO tarminalPO) {
            ScheduleScanJobService.this.store.getSendMessageExecutor().submit(new Runnable() {
                @Override
                public void run() {
                    ScheduleScanJobService.log.info("Send Message Executor : {} => {} : {}", delayLevel, ScheduleScanJobService.this.getLevelDelayTime(delayLevel), tarminalPO.getCode());
                    TarminalSet tarminalSet = null;
                    byte[] data = null;
                    {
                        // TODO 通过接口获取消息和终端信息
                    }
                    {
                        //发送内容
                        String content = "这是一个特殊时刻,左手右手一个慢动…作!一起摇摆";
                        //屏参数设置
                        tarminalSet = new TarminalSet();
                        tarminalSet.setInfoModelNormal(5);
                        tarminalSet.setInfoSpeed(0x00);
                        tarminalSet.setPropertyWidth(256);
                        tarminalSet.setPropertyHeight(64);
                        tarminalSet.setInfoTimeStay(4);
                        //bytes
                        data = LedManager.getInstance().sendOntimeMessage(tarminalPO.getCode(), content, tarminalSet, null);
                    }
                    // 发送
                    SendFactory.factory(tarminalSet).doSend(tarminalPO.getCode(), data);
                }
            });
        }

    }
上一篇下一篇

猜你喜欢

热点阅读