SpringBoot中的定时任务详解

2021-11-16  本文已影响0人  Yan雪杉

在大多数项目应该不可避免会用到定时任务了,如果是单体项目的话,要实现一个定时任务还是比较简单的,可以通过Executors.newScheduledThreadPool(10)来实现,也可以通过SpringBootScheduled注解来实现。如果是分布式项目或者微服务的话,要实现一个定时任务就比较麻烦了,或者自己去实现,或者使用第三方的分布式定时任务框架,比如QuartzElastic-jobxxl-job等。

在我们的几个项目中都会用到定时任务,而且用得也都比较频繁,在微服务项目中使用的是xxl-job,在单体项目中,由于SpringBoot自带了定时任务的实现,但是默认的实现不是很友好,加上我们对于定时任务的管理要比较灵活,可以自由地对定时任务进行增删改查,所以我们就利用Executors.newScheduledThreadPool(10)来实现了。

首先,我们还是来看一下SpringBoot中的定时任务Scheduled是如何实现的。

SpringBoot项目中,如果想要实现定时任务的话,首先需要在启动类上添加@EnableScheduling注解,然后在定时任务的方法上添加上@Scheduled注解,这样一个简单的定时任务就实现了。

@EnableScheduling

这个注解是SpringBoot项目实现定时任务的关键,我们首先来观察一下它的内部实现,点进去这个注解可以发现@Import(SchedulingConfiguration.class),可以看到它会导入一个叫做SchedulingConfiguration的配置类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

再点进去的话,就可以发现这个配置类做的事情非常简单,就是new出了一个ScheduledAnnotationBeanPostProcessor对象,这个对象就是实现定时任务的关键。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }

}

我们可以看下ScheduledAnnotationBeanPostProcessor的实现定义,发现它还是实现了非常多的接口的,其中有一个接口是MergedBeanDefinitionPostProcessor接口,而这个接口又继承了BeanPostProcessor接口,BeanPostProcessor这个接口有两个方法需要去实现,分别为postProcessBeforeInitializationpostProcessAfterInitialization方法,分别在bean的初始化前和初始化后调用。

那么我们就来关注一下postProcessAfterInitialization方法的实现,这个方法其实就是去扫描被@Scheduled注解标记的定时任务,当扫描到之后,会对每个定时任务调用processScheduled方法,而processScheduled方法就是对@Scheduled注解中的参数进行解析,比如fixedDelaycron等等,解析完成之后再把它添加到定时任务的集合中。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
            AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                });
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (logger.isTraceEnabled()) {
                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
            }
        }
        else {
            // Non-empty set of methods
            annotatedMethods.forEach((method, scheduledMethods) ->
                    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                        "': " + annotatedMethods);
            }
        }
    }
    return bean;
}

除了上述的接口以外,还有一个接口是ApplicationListener<ContextRefreshedEvent>,它会去监听ContextRefreshedEvent事件,当所有的bean都初始化完成并且装载完成的话,就会触发该事件,实现了这个接口的类就可以监听到这个事件,从而去实现自己的逻辑,这个接口只有一个方法定义onApplicationEvent(E event),所以当监听到ContextRefreshedEvent事件的时候,就会执行onApplicationEvent方法。

public class ScheduledAnnotationBeanPostProcessor
    implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
    Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
    SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {}

onApplicationEvent方法里面做的事也非常简单,就是调用内部的一个方法finishRegistrationfinishRegistraion方法的逻辑就比较复杂了,我们一一来看下

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (event.getApplicationContext() == this.applicationContext) {
        // Running in an ApplicationContext -> register tasks this late...
        // giving other ContextRefreshedEvent listeners a chance to perform
        // their work at the same time (e.g. Spring Batch's job registration).
        finishRegistration();
    }
}
private void finishRegistration() {
    // scheduler可以自己去实现,这个scheduler就是执行定时任务的线程池,可以自己去实现TaskScheduler,也就是使用jdk自带的ScheduledExecutorService
    // 具体可以看下setScheduler这个方法
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }

    // 查找SchedulingConfigurer配置类,然后加载配置,这个配置类也可以自己去实现,在这个配置类中也可以去指定定时任务的线程池
    if (this.beanFactory instanceof ListableBeanFactory) {
        Map<String, SchedulingConfigurer> beans =
                ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }

    // 这个registrar中就保存了被@Scheduled注解标注的定时任务集合,之后会讲到如何从其中获取定时任务集合,并且进行任务的取消
    // 如果存在被@Scheduled注解标记的定时任务,但是scheduler为null的话,就会尝试去搜索TaskScheduler,没有找到的话就抛出异常
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
        try {
            // Search for TaskScheduler bean...
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
        }
        catch (NoUniqueBeanDefinitionException ex) {
            if (logger.isTraceEnabled()) {
                logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
                        ex.getMessage());
            }
            try {
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskScheduler bean exists within the context, and " +
                            "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                            "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                            "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                            ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            if (logger.isTraceEnabled()) {
                logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
                        ex.getMessage());
            }
            // Search for ScheduledExecutorService bean next...
            try {
                this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex2) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
                            ex2.getMessage());
                }
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                }
                catch (NoSuchBeanDefinitionException ex3) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex2.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
                            ex2.getMessage());
                }
                // Giving up -> falling back to default scheduler within the registrar...
                logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
            }
        }
    }

    // 最后会执行这个方法
    this.registrar.afterPropertiesSet();
}
@Override
public void afterPropertiesSet() {
    scheduleTasks();
}
protected void scheduleTasks() {

    // 在这个方法里面,可以发现,如果taskScheduler不存在的话,就会创建出一个执行器,这个执行器应该不陌生了
    // 它就是一个corePoolSize为单线程,maxPoolSize为Integer.MAX_VALUE,队列为DelayedWorkQueue的执行器
    // 当存在很多个定时任务同时执行的时候,只会有一个定时任务被执行,其他的定时任务会被扔进DelayedWorkQueue队列中
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    // 下面的这几个判断就是将被@Scheduled注解标记的定时任务添加到任务集合中
    if (this.triggerTasks != null) {
        for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
        }
    }
    // 注意以下这个cron表达式的定时任务添加,后续我们去实现动态地对定时任务进行管理会用到
    if (this.cronTasks != null) {
        for (CronTask task : this.cronTasks) {
            // 这里的scheduleCronTask还是值得关注的
            addScheduledTask(scheduleCronTask(task));
        }
    }
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
    if (this.fixedDelayTasks != null) {
        for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
        }
    }
}

到这里呢,被@Scheduled注解标记的方法就会被作为定时任务添加到定时任务集合中了。

从上面我们可以发现,对于默认的定时任务的实现,执行定时任务的线程池并不是很友好,我们可以去自定义实现执行定时任务的线程池,可以去实现TaskScheduler,也可以去创建ScheduledExecutorService,还可以去实现配置类SchedulingConfigurer

@Configuration
public class TestConfig {

    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setRemoveOnCancelPolicy(false);
        taskScheduler.initialize();
        return taskScheduler;
    }
}

如何获取定义的定时任务集合

在之前的描述中,我们可以发现在服务启动的时候,IOC容器中会注入一个ScheduledAnnotationBeanPostProcessor的Bean对象,这个Bean对象就是来对定时任务进行管理的,那么我们就可以从这个类中获取到定时任务的集合,并且将定时任务都打印出来看一下内容都是什么,可以发现ScheduledTasktoString()方法就是定时任务的全类名加上方法名,比如com.yan.shiyue.Task.task,这样的话,我们就可以将这些定时任务给保存起来,作为一个Map,key就是定时任务的名字,value就是ScheduledTask,然后我们就可以动态地对这些任务进行取消了,因为ScheduledTask提供了一个cancel方法来取消定时任务的执行。

@Slf4j
@Component
public class ScheduledTaskConfig implements CommandLineRunner {

    @Autowired
    private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;

    @Override
    public void run(String... args) {
        Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
        for (ScheduledTask task : tasks) {
            log.error(task.toString());
        }
    }
}

如何动态地创建定时任务

我们可以发现SpringBoot提供的定时任务并不是很灵活,我们没法动态地对定时任务进行增删改查,那么基于SpringBoot的定时任务的实现,我们可以自己来实现定时任务的动态操作。

在接下来的操作中,就以cron表达式类型的定时任务进行动态地增删改查,在实现之前我们回顾一下SpringBoot中的cron表达式类型的定时任务时如何被添加到任务集合中的。

protected void scheduleTasks() {

    // 在这个方法里面,可以发现,如果taskScheduler不存在的话,就会创建出一个执行器,这个执行器应该不陌生了
    // 它就是一个corePoolSize为单线程,maxPoolSize为Integer.MAX_VALUE,队列为DelayedWorkQueue的执行器
    // 当存在很多个定时任务同时执行的时候,只会有一个定时任务被执行,其他的定时任务会被扔进DelayedWorkQueue队列中
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    // 下面的这几个判断就是将被@Scheduled注解标记的定时任务添加到任务集合中
    if (this.triggerTasks != null) {
        for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
        }
    }
    // 注意以下这个cron表达式的定时任务添加,后续我们去实现动态地对定时任务进行管理会用到
    if (this.cronTasks != null) {
        for (CronTask task : this.cronTasks) {
            // 这里的scheduleCronTask还是值得关注的
            addScheduledTask(scheduleCronTask(task));
        }
    }
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
    if (this.fixedDelayTasks != null) {
        for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
        }
    }
}

可以发现,SpringBoot对几种定时任务都实现了对应的Task,比如cron表达式类型的CronTask,固定频率类型的IntervalTask等等,那么我们如果要动态地添加一个cron表达式类型的定时任务的话,就可以实现CronTask了。

那么,我们自己创建好一个CronTask之后该如何执行呢,之前有提到过SpringBoot执行定时任务的执行器可以自定义,那么我们在自定义好执行器TaskScheduler之后,就可以调用其中的schedule方法来执行定时任务了。

首先,我们需要创建好一个任务,需要实现Runnable接口。

public class TestTask implements Runnable {
    
    @Override
    public void run() {
        System.out.println(System.currentTimeMillis() + "shiyue");
    }
}

然后,我们可以去实现一个接口,来动态地管理这个定时任务。

@RestController
public class TestController {

    @Autowired
    private TaskScheduler taskScheduler;

    @Autowired
    private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;


    private final Map<Integer, ScheduledFuture> taskMap = new ConcurrentHashMap<>();

    /**
     * 添加一个定时任务
     *
     * @return
     */
    @GetMapping("/task")
    public String addTask() {
        // 这里为了方便,cron表达式写死了,其实可以由外部传入
        CronTask cronTask = new CronTask(new TestTask(), "*/5 * * * * ?");
        ScheduledFuture scheduledFuture = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        // 同时,这里也是为了方便,使用Map来保存定时任务的信息,其实可以将定时任务持久化到MySQL中
        taskMap.put(1, scheduledFuture);
        return "shiyue";
    }

    /**
     * 更新一个定时任务,更新一个定时任务可以看做是将原来的定时给取消掉,然后新增一个新的定时任务
     *
     * @return
     */
    @GetMapping("/task/update/{id}")
    public String updateTask(@PathVariable Integer id, @RequestParam String cron) {
        ScheduledFuture scheduledFuture = taskMap.get(id);
        scheduledFuture.cancel(true);

        // 添加
        CronTask cronTask = new CronTask(new TestTask(), cron);
        ScheduledFuture scheduledFuture1 = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        taskMap.put(id, scheduledFuture1);
        return "Success";
    }

    @GetMapping("/task/list")
    public String taskList() {
        Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
        for (ScheduledTask task : tasks) {
            System.out.println(task);
        }
        return "qiyue";
    }
}
上一篇下一篇

猜你喜欢

热点阅读