在开发过程中,我们会用定时任务来执行一些 *** 作,例如定时去捞取流水重试业务、定时去消息中间件获取消息等等相关需求
简单的定时任务实现可以借助Spring提供的 @Scheduled 注解 详细看 Spring 原理之 Scheduled
如果涉及到 定时任务的动态管理就需要使用到其他技术,下面介绍一下Quartz
Quartz是一个开源的任务日程管理系统, 由 OpenSymphony开源,同时它是一个功能丰富的任务调用系统,可创建简单或者复杂的几十、几百、甚至成千上万的job。除此之外,quartz调度器还支持JTA事务和集群。
Maven 的pom.xml文件引入相关依赖包
<dependency>
<groupId>org.quartz-schedulergroupId>
<artifactId>quartzartifactId>
<version>2.3.2version>
dependency>
Quarz原理
本篇博客接着上两篇
- Java - Quarz 定时任务
- Java - Quarz 定时任务_使用注意点
这篇主要说一下 Quartz 的执行原理,在JobDetail 与 Trigger 执行好了之后,都是 通过 Scheduler 来完成任务的执行
前言 1、数据存储Quartz 对于 trigger 和 job 的存储下来主要有两种存储方式:
- RAMJobStore:将 trigger 和 job 存储在内存中,存取速度快,但是系统被停止后数据会丢失
- JobStoreSupport:基于 jdbc 将 trigger 和 job 存储到数据库中,存储速度慢一些但保证数据不丢失
在 Quarz 的默认配置文件中,存储默认为 RAMJobStore,若想更改存储方式,可以新增相应的 JobStore 配置与数据库配置
example:
#==============================================================
#Configure JobStore
#==============================================================*
# 指定 class JobStoreTX(自己管理事务) / JobStoreCMT(依赖于容器来进行事务的管理)
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
# 驱动代理
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 数据库表的前缀
org.quartz.jobStore.tablePrefix = QRTZ_
# 是否集群
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000
org.quartz.jobStore.txIsolationLevelSerializable = false
#==============================================================
#Configure DataSource
#==============================================================
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = 你的数据链接
org.quartz.dataSource.myDS.user = 用户名
org.quartz.dataSource.myDS.password = 密码
org.quartz.dataSource.myDS.maxConnections = 30
org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE
说明:上述配置中,JobStoreSupport 使用了一个驱动代理 StdJDBCDelegate 来 *** 作 trigger 和 job 的数据存储。其实现了大部分基于标准 JDBC 的功能接口,但是对于各种数据库来说,需要根据其具体实现的特点做某些特殊处理,所以 StdJDBCDelegate 有很多实现类来完成这些特殊处理,根据需求也可以使用任意一个驱动代理
2、线程Quartz 中,有两类线程:
- 调度线程:
- 执行 常规调度 的线程 QuartzSchedulerThread:轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务
- 执行 misfired trigger 的线程 MisfireHandler:扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理
- 任务执行线程:通常使用一个线程池维护一组线程,默认是使用 SimpleThreadPool 线程池
misfired job 是因为一些原因产生了 misfired 任务,如下:
- 系统被重启,有些任务在系统在关闭到重新启动中的时间内可能会被 misfire;
- 有些任务在 Trigger 被暂停的一段时间里可能会被 misfire;
- 线程池中线程都被占用,任务无法再被触发执行,造成 misfire;
- 有状态任务在下次触发时间到达时,上次执行还没有结束;为了处理 misfired job,Quartz 中为 trigger 定义了处理策略,主要有下面两种:
- MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:针对 misfired job 马上执行一次;
- MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次触发;默认是
当 Quartz 与 Spring 应用集成时,服务器启动会装载相关的 Bean,即 SchedulerFactoryBean,其实现了 InitializingBean 接口,在初始化Bean时会执行afterPropertiesSet 方法,该方法将会创建 Scheduler
Scheduler 两个具体的创建工厂类:DirectSchedulerFactory和 StdSchedulerFactory
- StdSchedulerFactory 与 DirectSchedulerFactory 创建出的Scheduler (createScheduler方法 )的类型都是 org.quartz.impl.StdScheduler
- DirectSchedulerFactory 创建出的Scheduler (createRemoteScheduler方法)的类型是 org.quartz.impl.RemoteScheduler
一般情况下,通常用 StdSchedulerFactory 来创建 Bean(下面的原理以 StdSchedulerFactory 创建 Scheduler 为 讲解 )
SchedulerFactory 在创建 Scheduler 的过程中,会读取配置文件的相关配置信息,从而来初始化各个组件,若不自定义,默认的为其自带的 quartz.properties 文件,相关默认配置如下:
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
# 线程池的默认配置,配置前缀为 org.quartz.threadPool
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
说明:(若不采用上述默认的配置,建立 org.quartz.properties 文件自定义相关key即可)
- 默认instanceName 为 DefaultQuartzScheduler
- 默认的线程池采用 SimpleThreadPool 线程池,线程个数为 10 ,优先级为5
- 默认的 jobStore 存储为 RAMJobStore
创建 Scheduler 的过程中根据配置文件的相关配置,若有相关配置会初始化一些组件,无相关配置则以默认的配置进行初始化,一些组件如下:
- org.quartz.impl.StdSchedulerFactory#instantiate
- ThreadPool:根据配置文件中 org.quartz.threadPool.class 进行配置,线程池内部配置获取以 org.quartz.threadPool 为前缀的配置进行初始化, 默认使用 SimpleThreadPool(org.quartz.simpl.SimpleThreadPool)其创建了一定数量的 WorkerThread 实例来使得 Job 能够在线程中进行处理,其中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;
- JobStore:分别为 RAMJobStore 和 JobStoreSupport ,默认使用 RAMJobStore (内存存储),可通过配置文件中 org.quartz.jobStore.class 来实现自定义
- QuartzSchedulerThread:创建出的 QuartzScheduler 中的 schedThread ,用来进行任务调度,在初始化的时候 paused=true, halted=false,这个线程已经准备就绪,因其paused=true,所以线程会一直等待,直到 start 方法将paused置为false;
整个启动流程如下图(默认配置):
说明:
- 先读取配置文件,读取系统的 org.quartz.properties 文件,若没有,读取自带的 quartz.properties 文件,采用默认配置来进行初始化
- 初始化 SchedulerFactoryBean,InitializingBean 方法中 初始化 SchedulerFactory
- 调用 **getScheduler **方法触发 StdSchedulerFactory 的 instantiate 方法进行实例化
- 实例化 执行线程池,默认线程池为 SimpleThreadPool ,可通过配置文件中 org.quartz.threadPool.class 自定义
- 实例化数据存储,默认为 RAMJobStore (内存存储),可通过配置文件中 org.quartz.jobStore.class 自定义
- 初始化QuartzScheduler,new一个QuartzSchedulerThread调度线程,并开始运行,初始字段 paused=true, halted=false
- 调度开始,注册监听器,注册Job和Trigger
- 调用 start() 方法,改变 paused=false,执行调度线程
Scheduler 具体常用的一些方法:
方法名称 | 描述 |
---|---|
start startDelayed(int) isStarted | 开始 Scheduler 线程、立即开启/延迟开启、是否开启线程 |
shutdown shutdown(boolean) isShutdown | 关闭Scheduler线程,立即关闭/等待所有job执行完关闭、是否关闭线程 |
scheduleJob | 设置定时任务 指定 JobDetail 与 trigger |
triggerJob | 立即执行定时任务 |
unscheduleJob | 去除job的Trigger |
rescheduleJob | 去除job的Trigger,重新配置新Trigger(之前的定时任务配置被删除) |
resumeJob | 恢复执行定时任务 |
pauseJob | 暂停执行定时任务 |
deleteJob | 删除定时任务 |
上述说到 org.quartz.impl.StdScheduler 里面调用的都是 org.quartz.core.QuartzScheduler 的方法,所以核心的业务逻辑与原理都在该类中实现,包括上面的常用方法,均是通过调用 org.quartz.core.QuartzScheduler 的对应方法来实现
总览:
说明:
- 一些方法对应上述表格中的各个方法的实现,实现逻辑与原理相似
- 通过调用关联的 JobStore(即 RAMJobStore 和 JobStoreSupport ) 里面的方法来具体实现
关于后续的流程还是有待继续探讨的~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)