资源链接:https://download.csdn.net/download/qq_45060540/65332556
GItee地址:https://gitee.com/hyxl-520/autojob.git
本框架主要功能是实现中心化的计划任务动态调度,可以动态实现计划任务的CURD,实现定时任务、周期性重复任务、立即执行任务等功能,并且集成应用层,可以通过REST结果实现任务的CURD以及模块核心线程的监测、重启、停止等功能。模块配置心跳检测器,可以最大程度的保证核心线程的运行,并且提供不同的心跳检测策略。模块提供完善配置,可以根据配置制定不同的调度方案。该模块由于包含应用层,可以很方便实现中心化管理、分布式拓展等。同时,本框架对于包含Autowired注入的变量的方法,如mapper、service等都能完美调用,前提是该方法所在的类也得纳入Spring的上下文统一管理。
架构设计系统分为三层架构:应用层、任务注册器、任务调度器、心跳检测器以及任务调度队列.
应用层:MVC架构,主要提供REST接口供用户提交、注册、停止、删除计划任务,以及管理任务调度器.
任务注册器:主要用于应用层对任务进行注册和注销。
任务调度器(核心):负责整个任务的调度、动态任务持久化、任务状态更新等核心功能。
任务调度队列:采用可阻塞队列实现,当任务队列已满时可以决定是否阻塞等待任务队列空出。
心跳检测器:负责监听任务调度器的核心线程,当监听到某个核心线程宕掉时,按照策略重启该线程,具体策略请见参数配置说明。
安装说明 1.创建数据库首先自己创建一个数据库,配置文件默认是auto_job,然后运行auto_job.sql创建任务表。
2.导入项目导入项目并下载相关依赖。
3.配置该项目属于spring-web项目,需要配置相关端口号,默认是8086。同样,你也需要配置本机的数据库的用户名、密码以及地址。以上项目配置完成后还需完成框架配置,具体配置信息见下面的配置参数。
4.入门当上面所有都配置完后,你就可以启动项目,本项目暂无前端UI层,但是提供了测试的Rest接口,你可以通过PostMan等测试工具进行测试
在URL地址栏输入:(POST)http://localhost:8086/auto_job/run_tasks_now
设置参数,选择body-row-json,输入以下参数:
{ "methodClass": "com.example.autojob.job.Jobs", "methodName": "lithenTask", "circle":5000, "repeatTimes":2, "isEndurance":false, "attributes":[ { "type":"string", "values":{ "value":"hello world" } }, { "type":"com.example.autojob.domain.Task", "values":{ "methodName":"这是一个测试方法名" } } ] }
如果成功执行你将会看到接口返回值
{ "code": 200, "data": [ { "id": 1639647582245, "startTime": "2021-12-16 17:39:41", "isStarted": 0, "isFinished": 0, "circle": 5000, "repeatTimes": 2, "finishedTimes": 0, "methodClassName": "com.example.autojob.job.Jobs", "methodName": "lithenTask", "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]", "isEndurance": false } ], "totalnum": 1, "message": "成功启动任务:lithenTask" }
你将会看到控制台输出(为了结果清楚,暂时关闭ERROR级别以下的输出)
lithenTask方法如下图
@Component public class Job{ //从上下文获取事件管理器 @Autowired TaskEventManage manager; public void lithenTask(String value,Task task) { //定义监听器 TaskListener5.解释listener1 = taskEvent -> System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "准备启动"); TaskListener listener2 = taskEvent -> { System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "已完成"); }; TaskListener listener3=taskEvent -> System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "成功执行完成"); //注册监听器,manager变量可以直接使用 manager.addTaskEventListener(listener1,TaskBeforeRunEvent.class); manager.addTaskEventListener(listener2,TaskFinishedEvent.class); manager.addTaskEventListener(listener3,TaskSuccessEvent.class); System.out.println("lithenTask开始执行"); System.out.println("传进来的参数:value="+value+",task.methodName="+task.getMethodName()); } }
参数的解释可以看“立即执行任务接口”,lithenTask首先注册了三个监听器,分别监听任务的启动前,已完成,已成功完成三个事件,并输出对应的事件信息,由于监听器的注册在方法内部,所以事件监听输出会延迟。我们可以发现调度的方法使用了manager这个全局变量,而且是Autowired注入的,该框架寻找方法所在的对象有三步,首先是在Spring容器里查找,如果不存在且方法有参数则按照方法名和参数列表寻找,否则就直接通过方法名寻找。Job上有component注解,所以能在Spring容器获取到,因此能使用Autowired的变量。
配置参数# 集中式任务调度平台配置 autoJob: task: # 任务调度队列最大任务数 maxTaskNum: 100 # 供任务执行的线程池最大线程数 maxThreadNum: 100 executor: # 调度器运行频率,调度器将按照该频率进行运转,该值过小可能会占用过多CPU资源:毫秒 runCircle: 1000 # 执行器在调度任务进入调度队列时,队列已满时的最大等待时长:毫秒 maxJoinWait: 5000 # 是否开启过长等待任务持久化,过长等待任务将先放入DB,保证任务队列的高效性 longTimetaskDB: false # 当longTimetaskDB=true时,等待时长超过该阈值的任务会进行持久化,并且调度器会以该参数为周期从DB进行调度:分钟 maxWait: 1 # BY_SUBMIT检测时为心跳提交周期,线程应严格按照该参数周期性提交心跳:秒 submitCircle: 10 # 是否周期性从数据库调度未完成任务进入任务调度队列 ifScedulingTasksFromDB: false # 从数据库调度任务进入任务调度队列的周期:分钟 scedulingTasksFromDBCircle: 1 # 每次调度任务的时间区间,从执行任务起往后的分钟数 scedulingTasksFromDBTime: 5 heartBeats: # 心跳检测线程池最大线程数 maxThread: 100 # 有两种心跳检测模式,默认第一种 # BY_SELF:心跳监测器自主式检测,这种检测方式可以简化需要监听的线程,但是心跳线程异常退出后可能无法自主启动心跳线程 # BY_SUBMIT:线程提交式心跳检测,线程按照一定周期提交心跳,如果提交失败说明心跳线程可能异常退出,被监听线程可以尝试启动心跳检测线程,这种方式监听器和被监听线程耦合度高,被监听线程会执行冗余工作 model: BY_SELF # BY_SUBMIT模式下有效,当被监听线程:在[理论提交心跳时间=心跳次数*心跳周期,理论提交心跳时间+repeatThreshold*提交心跳周期]区间内没有提交心跳时,监听器认为该线程已经退出,当ifRepeatTask=true时尝试重新运行被监听线程 repeatThreshold: 0.01 # 当检测到被监听线程异常停止时是否尝试重新启动,重新启动的任务将在心跳检测线程池执行 ifRepeatTask: true # 当检测到被监听线程异常停止时最大尝试启动次数,当ifRepeatTask=true时有效 maxRepeat: 5 # 心跳检测器检测周期:毫秒 listenCircle: 1000 # 是否打印记录被监听线程的提交心跳日志 isRecordSubmitHeart: false使用说明
项目架构如下
除了controller和service包下的组件,其他均是必须的,controller和service可以通过任务调度器和任务注册器提供的API自己编写,项目启动时,任务调度队列、任务调度器、心跳检测器、任务生命周期事件管理器、注册器在项目启动时均会自动注入到Spring上下文并运行,无需调用启动。
应用层REST接口说明 插入计划任务接口功能:插入一条新的计划任务到DB,该任务有两种触发方式:一种是通过注册任务接口注册进调度队列等待调用,一种是开启周期性调度任务功能:ifScedulingTasksFromDB=true
URL:POST:/auto_job/insert_task
参数示列
{ "startTime": "2021-12-04 14:00:00", //计划启动时间(必选) "methodClass": "com.sccl.modules.job.conf.AutoJob", //任务所在类路径(必选) "methodName": "test6", //方法名(必选) "circle":70000, //运行周期(可选)默认0 "repeatTimes":3, //重复执行次数(可选)默认1 "isEndurance":true, //如果启动时间距现在的时间超过阈值是否进行持久化,当配置longTimetaskDB=true时生效,默认是true //方法参数,支持两种类型参数,string|Object,应该严格按照计划任务方法的参数列表顺序给出 "attributes":[ //参数为对象时type为该参数的类路径,values为该参数的某些属性,这些属性必须有相应的set方法,为了抑止“套娃行为”,set方法的参数必须为string { "type":"com.sccl.autojob.domain.Task", "values":{ "startTime":"123" } }, //参数为字符串时,values只需要一个属性:value:"字符串内容" { "type":"string", "values":{ "value":"123" } } ] }
返回值:
{ "code": 200, "totalnum": 1, "message": "插入定时任务成功" }查询计划任务接口
URL:GET:/auto_job/select_tasks_id?IDS=?
参数列表
IDS=75//多个id请用逗号分隔
返回值
{ "code": 200, "data": [ { "id": 75, "startTime": "2021-12-17 12:00:00", "isStarted": 0, "isFinished": 0, "circle": 5000, "repeatTimes": 2, "methodClassName": "com.example.autojob.job.Jobs", "methodName": "lithenTask", "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]" } ], "totalnum": 1, "message": "查询成功" }注册任务接口
功能说明:从DB获取任务插入到调度队列供任务调度器动态调度
URL:GET:/auto_job/register_tasks?IDS=?&WAIT_TIME=?
参数列表
IDS=75//多个id请用逗号分隔 WAIT_TIME=1000(ms)//可选参数,默认为0:如果调度队列已满,应该等待的时长
返回值
{ "code": 200, "totalnum": 1, "message": "注册任务到调度队列成功" }删除任务接口
功能说明:逻辑删除DB中指定计划任务
URL:GET:/auto_job/delete_tasks_ids?IDS=?
参数列表
IDS=74//多个id请用逗号分隔
返回值
{ "code": 200, "totalnum": 1, "message": "删除成功" }强制停止正在运行中的计划任务接口
功能:强制停止正在执行中的计划任务,如果存在的话
URL:GET:/auto_job/stop_tasks_id?IDS=?
参数列表
IDS=111,222//多个id请用逗号分隔
返回值`
{ "code": 200, "data": [ { "id": 1639707039802, "isFinished": 1, "isSuccess": 0, "finishTime": "2021-12-17 10:10:44" } ], "totalnum": 1, "message": "强制停止任务成功" }查询已完成的所有任务接口
功能:字面意思
URL:GET:/auto_job/select_finished_tasks
返回值
{ "code": 200, "data": [ { "id": 76, "startTime": "2021-12-17 10:46:20", "isStarted": 0, "isFinished": 1, "finishTime": "2021-12-17 10:46:32", "methodClassName": "com.example.autojob.job.Jobs", "methodName": "waitting" } ], "totalnum": 1, "message": "查询成功" }立即执行计划任务接口
功能说明:立即执行指定的计划任务
URL:POST:/auto_job/run_tasks_now
参数示列
{ "methodClass": "com.sccl.modules.job.conf.AutoJob", //任务所在类路径(必选) "methodName": "test6", //方法名(必选) "circle":70000, //运行周期(可选)默认0 "repeatTimes":3, //重复执行次数(可选)默认1 "isEndurance":true, //如果启动时间距现在的时间超过阈值是否进行持久化,当配置longTimetaskDB=true时生效,默认是true //方法参数,支持两种类型参数,string|Object,应该严格按照计划任务方法的参数列表顺序给出 "attributes":[ //参数为对象时type为该参数的类路径,values为该参数的某些属性,这些属性必须有相应的set方法,为了抑止“套娃行为”,set方法的参数必须为string { "type":"com.sccl.autojob.domain.Task", "values":{ "startTime":"123" } }, //参数为字符串时,values只需要一个属性:value:"字符串内容" { "type":"string", "values":{ "value":"123" } } ] }
返回值
{ "code": 200, "data": [ { "id": 1639709611149, "startTime": "2021-12-17 10:53:30", "isStarted": 0, "isFinished": 0, "circle": 5000, "repeatTimes": 1, "finishedTimes": 0, "methodClassName": "com.example.autojob.job.Jobs", "methodName": "waitting", "isEndurance": false } ], "totalnum": 1, "message": "成功启动任务:waitting" }获取正在任务队列中的所有任务接口
功能说明:包含正在执行和等待执行的所有任务,有时可能任务完成调度器还没监听到任务完成,也包含部分执行了的任务。
URL:GET:/auto_job/get_waiting_running_tasks
返回值
{ "code": 200, "data": [ { "id": 75, "startTime": "2021-12-17 12:00:00", "isStarted": 0, "isFinished": 0, "circle": 5000, "repeatTimes": 2, "methodClassName": "com.example.autojob.job.Jobs", "methodName": "lithenTask", "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]" } ], "totalnum": 1, "message": "查询成功" }获取任务调度器核心线程运行状态接口
功能说明:字面意思
URL:GET:/auto_job/get_executor_status
返回值示列:
{ "code": 200, "data": [ { "ThreadName": "autoUpdateFinishedListener", "status": "running" }, { "ThreadName": "startTimeListener", "status": "running" }, { "ThreadName": "taskFinishListener", "status": "running" }, { "ThreadName": "autoScedulingTaskToDB", "status": "running" } ], "message": "success" }重启任务调度器接口
功能说明:关闭任务调度器的所有核心线程并重启,并且在心跳检测器重新注册心跳,之前的心跳信息将会全部清空。此 *** 作不会删除任务调度队列中的待执行任务,也不会终止正在执行的计划任务
URL:GET:/auto_job/restart_executor
返回值
{ "code": 200, "message": "重启任务调度器成功" }
控制台输出
autoUpdateFinishedListener的心跳注销成功 关闭线程:autoUpdateFinishedListener成功 startTimeListener的心跳注销成功 关闭线程:startTimeListener成功 taskFinishListener的心跳注销成功 关闭线程:taskFinishListener成功 执行器初始化...... 执行器状态:{maxThreads=100, runCircle=1000, maxWaitTime=1, maxJoinWait=5000, longTimetaskDB=false, ifScedulingTasksFromDB=false, scedulingTasksFromDBCircle=1, scedulingTasksFromDBTime=5} 注册autoUpdateFinishedListener的心跳已完成 启动线程:autoUpdateFinishedListener成功 注册startTimeListener的心跳已完成 启动线程:startTimeListener成功 注册taskFinishListener的心跳已完成 启动线程:taskFinishListener成功 =============================================>自动更新完成任务监听器启动 =============================================>定时任务监听器启动 =============================================>任务关闭监听器启动启动任务调度器接口
功能说明:如果任务调度器有某些核心线程未启动则启动
URL:GET:/auto_job/start_executor
返回值
{ "code": 200, "message": "启动任务调度器成功" }强制停止任务调度器接口
功能说明:关闭任务调度器的所有核心线程,并且在心跳检测器注销心跳,之前的心跳信息将会全部清空。此 *** 作不会删除任务调度队列中的待执行任务,也不会终止正在执行的计划任务
URL:GET:/auto_job/stop_executor
返回值
{ "code": 200, "message": "关闭任务调度器成功" }获取心跳检测器状态接口
功能说明:字面意思
URL:GET:/auto_job/get_heart_beat_status
返回值示列:
{ "code": 200, "data": { "model": "BY_SELF", "status": "running" }, "message": "success" }重启心跳检测器接口
功能说明:注销心跳检测器并且重新启动,该 *** 作不会注销线程的登记信息
URL:GET:/auto_job/get_heart_beat_status
返回值
{ "code": 200, "message": "重启心跳检测器成功" }
控制台输出
心跳检测器初始化...... 心跳检测器状态:采用BY_SUBMIT心跳检测模式,{maxThreads=100, submitTime=BY_SUBMIT', repeatDoor=0.01, ifRepeatTask=true, maxRepeat=5, isRecordSubmitHeart=false, listenCircle=1000} =============================================>线程提交式心跳检测器已启动
_status
返回值示列:
{ "code": 200, "data": { "model": "BY_SELF", "status": "running" }, "message": "success" }重启心跳检测器接口
功能说明:注销心跳检测器并且重新启动,该 *** 作不会注销线程的登记信息
URL:GET:/auto_job/get_heart_beat_status
返回值
{ "code": 200, "message": "重启心跳检测器成功" }
控制台输出
心跳检测器初始化...... 心跳检测器状态:采用BY_SUBMIT心跳检测模式,{maxThreads=100, submitTime=BY_SUBMIT', repeatDoor=0.01, ifRepeatTask=true, maxRepeat=5, isRecordSubmitHeart=false, listenCircle=1000} =============================================>线程提交式心跳检测器已启动 资源链接:https://download.csdn.net/download/qq_45060540/65332556 GItee地址:https://gitee.com/hyxl-520/autojob.git
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)