在Spring Boot中使用定时任务,只需要@EnableScheduling开启定时任务支持,在需要调度的方法上添加@Scheduled注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
1.1 缺点- 周期一旦指定,想要更改必须要重启应用
- 热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等
- 最小改造兼容现有的定时任务(仅需添加一个注解)
- 动态增加定时任务
2.1 @EnableScheduling 引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @import(SchedulingConfiguration.class) @documented public @interface EnableScheduling { }
2.2 SchedulingConfiguration只配置了一个bean,ScheduledAnnotationBeanPostProcessor从名字就知道该类实现BeanPostProcessor接口
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
2.3 ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization实现,可见具体处理@Scheduled实现定时任务的是processScheduled方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { // 获取bean的方法及@Scheduled映射关系 Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.metadataLookup >) method -> { Set scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> // 处理@Scheduled注解 scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled处理cron表达式的关键实现,
private final ScheduledTaskRegistrar registrar; public ScheduledAnnotationBeanPostProcessor() { this.registrar = new ScheduledTaskRegistrar(); } protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 将定时任务方法,转为Runnable Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; Settasks = new linkedHashSet<>(4); // Determine initial delay // 处理 scheduled.initialDelay()的值,略过... // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { // ${}变量值表达式的转换 cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 创建cron触发器CronTrigger对象,并注册CronTask tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // 处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过... } // 略过 catch Exception ... }
以上通过this.registrar.scheduleCronTask实现cron定时任务注册或初始化
3.动态定时任务的实现实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask注册或初始化定时任务
3.1 相关类图DisposableBean+destroy() : voidDynamicCronScheduleTaskManager+Map
import org.springframework.beans.factory.DisposableBean; import org.springframework.scheduling.config.ScheduledTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.config.TriggerTask; import java.util.HashMap; import java.util.Map; public class DynamicCronScheduleTaskManager implements DisposableBean { private Map3.3 AbstractDynamicCronHandlerdynamicScheduledTaskMap = new HashMap<>(); ScheduledTaskRegistrar registrar; // 添加定时任务 public ScheduledTask addTriggerTask(String cronName, TriggerTask task) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName); if (scheduledTask != null) { scheduledTask.cancel(); } scheduledTask = this.registrar.scheduleTriggerTask(task); dynamicScheduledTaskMap.put(cronName, scheduledTask); return scheduledTask; } public boolean contains(String cronName){ return this.dynamicScheduledTaskMap.containsKey(cronName); } // 更新定时任务的触发时机 public void updateTriggerTask(String cronName) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName); if (scheduledTask == null) { throw new IllegalStateException("Invalid cronName "" + cronName + "",no fund ScheduledTask"); } scheduledTask.cancel(); scheduledTask = this.registrar.scheduleTriggerTask((TriggerTask) scheduledTask.getTask()); dynamicScheduledTaskMap.put(cronName, scheduledTask); } // 移除定时任务 public void removeTriggerTask(String cronName) { ScheduledTask scheduledTask = dynamicScheduledTaskMap.remove(cronName); if (scheduledTask != null) { scheduledTask.cancel(); } } @Override public void destroy() throws Exception { for (ScheduledTask value : dynamicScheduledTaskMap.values()) { value.cancel(); } this.dynamicScheduledTaskMap.clear(); } }
public abstract class AbstractDynamicCronHandler { @Autowired protected DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; public abstract String getCronexpression(String cronName); public void updateTriggerTask(String cronName) { dynamicCronScheduleTaskManager.updateTriggerTask(cronName); } }3.4 EnvironmentDynamicCronHandler
基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent事件实现了定时任务的触发时机的更新
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.context.environment.EnvironmentChangeEvent; import org.springframework.context.EnvironmentAware; import org.springframework.context.event.EventListener; import org.springframework.core.env.Environment; public class EnvironmentDynamicCronHandler extends AbstractDynamicCronHandler implements EnvironmentAware { private final Logger logger = LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class); private Environment environment; @Override public String getCronexpression(String cronName) { try { return environment.getProperty(cronName); } catch (Exception e) { logger.error(e.getMessage(), e); } return null; } @Override public void setEnvironment(Environment environment) { this.environment = environment; } @EventListener public void environmentChangeEvent(EnvironmentChangeEvent event) { for (String key : event.getKeys()) { if (this.dynamicCronScheduleTaskManager.contains(key)) { this.dynamicCronScheduleTaskManager.updateTriggerTask(key); } } } }3.5 DynamicCronTrigger
public class DynamicCronTrigger implements Trigger { private final static Logger LOGGER = LoggerFactory.getLogger(DynamicCronTrigger.class); private String cronName; private AbstractDynamicCronHandler dynamicCronHandler; private String cronexpression; private CronSequenceGenerator sequenceGenerator; public DynamicCronTrigger(String cronName, AbstractDynamicCronHandler dynamicCronHandler) { this.cronName = cronName; this.dynamicCronHandler = dynamicCronHandler; } @Override public Date nextExecutionTime(TriggerContext triggerContext) { String cronexpression = dynamicCronHandler.getCronexpression(cronName); if (cronexpression == null) { return null; } if (this.sequenceGenerator == null || !cronexpression.equals(this.cronexpression)) { try { this.sequenceGenerator = new CronSequenceGenerator(cronexpression); this.cronexpression = cronexpression; } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } Date date = triggerContext.lastCompletionTime(); if (date != null) { Date scheduled = triggerContext.lastScheduledExecutionTime(); if (scheduled != null && date.before(scheduled)) { // Previous task apparently executed too early... // Let's simply use the last calculated execution time then, // in order to prevent accidental re-fires in the same second. date = scheduled; } } else { date = new Date(); } return this.sequenceGenerator.next(date); } }3.6 注解类ScheduledDynamicCron
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @documented public @interface ScheduledDynamicCron { @AliasFor("cronName") String value() default ""; @AliasFor("value") String cronName() default ""; Class extends AbstractDynamicCronHandler> handler() default EnvironmentDynamicCronHandler.class; }3.7 DynamicScheduledAnnotationBeanPostProcessor
import org.springframework.beans.factory.BeanFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor; import org.springframework.scheduling.config.*; import org.springframework.scheduling.support.CronTrigger; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.util.StringValueResolver; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.linkedHashSet; import java.util.Map; import java.util.Set; import java.util.TimeZone; public class DynamicScheduledAnnotationBeanPostProcessor extends ScheduledAnnotationBeanPostProcessor { private StringValueResolver embeddedValueResolver; private BeanFactory beanFactory; private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; private final ScheduledTaskRegistrar registrar = (ScheduledTaskRegistrar) getFieldValueFromParentClass("registrar"); private final Map3.8 配置类SchedulerConfiguration
@EnableScheduling public class SchedulerConfiguration implements SchedulingConfigurer { @Value("${app.scheduler.thread.count:10}") private int schedulerThreadCount; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { TaskScheduler taskScheduler = new ConcurrentTaskScheduler(new ScheduledThreadPoolExecutor(schedulerThreadCount)); taskRegistrar.setTaskScheduler(taskScheduler); } @Bean public EnvironmentDynamicCronHandler environmentDynamicCronHandler() { return new EnvironmentDynamicCronHandler(); } @Bean public DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager() { return new DynamicCronScheduleTaskManager(); } @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Primary public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor(DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager) { return new RedisScheduledAnnotationBeanPostProcessor(dynamicCronScheduleTaskManager); } }4. Spring boot使用示例 4.1 配置方式,在启动类中导入配置类SchedulerConfiguration
如下示例:
@SpringBootApplication @import(SchedulerConfiguration.class) // 导入配置类 public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }4.2 nacos配置cron示例
nacos的配置
cron: name1: "0/5 * * * * ?" # 每5秒执行一次
使用@ScheduledDynamicCron指定cron表达式的配置名cron.name1,不指定handler()默认使用EnvironmentDynamicCronHandler,该类会根据指定的配置名cron.name1获取nacos上的cron表达式
@Component public class DynamicTask { @Scheduled @ScheduledDynamicCron("cron.name1") public void dynamicCronForEnvironment() { System.out.println("dynamicCronForEnvironment:" + DateUtils.format(LocalDateTime.now())); } }
- @Scheduled仍需要添加,但会忽略其中的cron属性配置
- 修改nacos的cron.name1配置为0/2 * * * * ?并发布,定时任务会立即由原来的5秒执行一次,变为2秒执行一次
扩展AbstractDynamicCronHandler,实现从数据库查询cron表达式
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DynamicCronHandler extends AbstractDynamicCronHandler { @Autowired private ParametersService parametersService; @Override public String getCronexpression(String cronName) { // 通过cronName查询保存在数据库的cron表达式 Parameters ap = parametersService.getByName(cronName); return ap.getValue(); } }
定时任务类
@Component public class DynamicTask { @Scheduled // 需要指定handler为上面定义的DynamicCronHandler @ScheduledDynamicCron(cronName = "cron.name2", handler = DynamicCronHandler.class) public void dynamicCronForDb() { System.out.println("dynamicCronForDb:" + LocalDateTime.now()); } }
定时任务触发时机更新,需要在更新数据库配置时进行更新
@RestController @RequestMapping("parameters") public class ParametersController { @Autowired private ParametersService parametersService; @Autowired private DynamicCronHandler dynamicCronHandler; @PostMapping("/update") public Result update(Parameters parameters){ if (parametersService.update(parameters)) { if ("cron.name2".equals(parameters.getName())) { // 更新数据库配置后,更新定时任务的触发时机 dynamicCronHandler.updateTriggerTask(cronName); } } return Result.success(); } }4.4 分布式集群部署服务的定时任务的更新
上面更新数据库配置后,同步更新任务的触发时机,仅在本服务生效,集群中的其他服务节点并不会更新
其他节点的更新可以通过消息总线的方式进行更新,如通过MQ发送广播消息,其它服务节点消费消息后调用以下方法更新任务触发时机
dynamicCronHandler.updateTriggerTask(cronName);4.5 添加定时任务
添加任务的web接口
@RestController @RequestMapping("dynamicCron") public class DynamicCronController { @Autowired private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager; @Autowired private EnvironmentDynamicCronHandler environmentDynamicCronHandler; @PostMapping("/addTask") public Result addTask(String cronName){ // 创建要定时运行的Runnable Runnable runnable = () -> System.out.println("run task:" + LocalDateTime.now()); // 使用EnvironmentDynamicCronHandler,创建触发器 DynamicCronTrigger trigger = new DynamicCronTrigger(cronName, environmentDynamicCronHandler); // 添加定时任务 dynamicCronScheduleTaskManager.addTriggerTask(cronName, new TriggerTask(runnable, trigger)); return Result.success(); } }
接口执行完成后,定时任务并不会执行,因为还没配置cron.name2,在nacos配置cron表达式后,定时任务将开始调度
配置nacos后的控制台输出
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)