目录
1.问题发现与描述
2.springboot定时任务的注册
3.springboot定时任务的调度与执行
4.总结
1.问题发现与描述
生产上一个服务定时从上游系统拉取数据,突然任务停止了,不再继续执行了。查看日志发现报了异常,springboot的数据库连接池不够了。但是连接池不够为什么会影响到定时任务呢?下面是破案全过程。
@Configuration
@EnableScheduling
public class DynamicScheduleAsyncTask implements SchedulingConfigurer {
@Autowired
private QueryRepository queryRepository;
@Autowired
private AsyncTimerTask timerTask;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(timerTask,
triggerContext -> {
//2.1 从数据库获取执行周期
String cron = queryRepository.queryCron();
//2.3 返回执行周期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
这是配置定时任务的代码,我们要支持动态调整cron周期,所以选择了triggerTask。这个是springboot定时任务的基本配置与使用。接下来我们跟踪源码解决问题。
2.springboot定时任务的注册首先我们在配置类中加上了@EnableScheduling注解。这个注解向spring容器注入了一个定时任务的自动配置类。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
我们再来看看SchedulingConfiguration这个配置类做了什么。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
很明显这个配置类向spring容器中注入了一个后置处理器ScheduledAnnotationBeanPostProcessor。所谓后置处理器就是指spring在初始化的过程中会回调这些类的某些方法。接下来走进这个后置处理器的postProcessAfterInitialization方法。
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
//找到所有标记了@Scheduled注解的方法
Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup>) method -> {
Set scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
//processScheduled这个方法中将当前任务放进集合
annotatedMethods.forEach((method, scheduledAnnotations) ->
scheduledAnnotations.forEach(scheduled ->
processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
3.springboot定时任务的调度与执行
上边提到的ScheduledAnnotationBeanPostProcessor这个后置处理器还有一个作用,它实现了ApplicationListener,顾名思义它还是个监听器,spring启动完成后回调所有监听器的onApplicationEvent方法,也正是在这个方法中完成了任务的调度与执行。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
private void finishRegistration() {
this.registrar.afterPropertiesSet();
}
this.registrar默认是ScheduledTaskRegistrar。我们接着往下看:
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
//创建任务执行器 默认是ConcurrentTaskScheduler
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//triggerTasks不为空 我们直接看scheduleTriggerTask方法
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
@Nullable
public ScheduledTask scheduleTriggerTask(TriggerTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
//scheduledTask为null new一个
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//taskScheduler默认实现是ConcurrentTaskScheduler,我们接着看它里边的schedule方法
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addTriggerTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
@Override
@Nullable
public ScheduledFuture> schedule(Runnable task, Trigger trigger) {
try {
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
//进入到这个分支 我们直接看ReschedulingRunnable里边的schedule方法
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}
@Nullable
public ScheduledFuture> schedule() {
synchronized (this.triggerContextMonitor) {
//回调我们的获取cron表达式的方法 计算下次执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
//计算出距离下次执行的毫秒值
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
//this.executor是JDK的延时线程池DelegatedScheduledExecutorService
//重要一点:第一个入参就是要被执行的任务,也就是当前对象ReschedulingRunnable
//所以延时时间到了之后,JDK线程池就会调用当前对象的run方法。
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
@Override
public void run() {
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
//这里就是回调了我们自定义的任务方法
super.run();
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
//上述过程都是计算和更新下次执行任务的时间 虽然重要但是不是本次分析重点
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
//重点在这儿:这里又调用了这个schedule方法
//那么代码又回到了上边schedule方法中。
schedule();
}
}
}
4.总结
总结一下:其实道理很简单,所有的问题都出现在ReschedulingRunnable这个类中。
1.schedule方法中从数据库获取cron表达式计算延时时间,然后将自己作为任务对象放到JDK的延时线程池中。
2.JDK延时线程池执行任务时调用当前对象的run方法。
3.当前对象的run方法再次计算延时时间,调用schedule方法。
很明显,上述三个步骤形成了一个循环,正常情况下我们的任务就会一次次的执行下去直到JVM退出。但是文章开头提到,我们的数据库连接池不够了,在第一步从数据库获取cron表达式的时候抛出了异常,导致schedule方法执行到第一行就结束了。也就是说没有将任务放到JDK的线程池中,所以后续的run方法也就不会执行了。这个循环也就结束了。
截止到目前我们已经将问题的原委全部弄通,怎们解决这个问题合适呢?
第一我们调整了hikari连接池的参数,核心连接数由默认10调到了30.
第二我们调整了hikari连接的超时时间,由默认的3000ms调整到了6000ms.
第三我们在获取cron表到式的时候通过try catch将异常捕获,如果数据库异常我们给个默认的cron即可.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)