可集成式计划任务动态调度框架设计说明

可集成式计划任务动态调度框架设计说明,第1张

可集成式计划任务动态调度框架设计说明 可集成式计划任务动态调度框架设计说明

资源链接:https://download.csdn.net/download/qq_45060540/65332556
GItee地址:https://gitee.com/hyxl-520/autojob.git

功能说明

本框架主要功能是实现中心化的计划任务动态调度,可以动态实现计划任务的CURD,实现定时任务、周期性重复任务、立即执行任务等功能,并且集成应用层,可以通过REST结果实现任务的CURD以及模块核心线程的监测、重启、停止等功能。模块配置心跳检测器,可以最大程度的保证核心线程的运行,并且提供不同的心跳检测策略。模块提供完善配置,可以根据配置制定不同的调度方案。该模块由于包含应用层,可以很方便实现中心化管理、分布式拓展等。同时,本框架对于包含Autowired注入的变量的方法,如mapper、service等都能完美调用,前提是该方法所在的类也得纳入Spring的上下文统一管理。

架构设计

系统分为三层架构:应用层、任务注册器、任务调度器、心跳检测器以及任务调度队列.

应用层:MVC架构,主要提供REST接口供用户提交、注册、停止、删除计划任务,以及管理任务调度器.

任务注册器:主要用于应用层对任务进行注册和注销。

任务调度器(核心):负责整个任务的调度、动态任务持久化、任务状态更新等核心功能。

任务调度队列:采用可阻塞队列实现,当任务队列已满时可以决定是否阻塞等待任务队列空出。

心跳检测器:负责监听任务调度器的核心线程,当监听到某个核心线程宕掉时,按照策略重启该线程,具体策略请见参数配置说明。

安装说明 1.创建数据库

首先自己创建一个数据库,配置文件默认是auto_job,然后运行auto_job.sql创建任务表。

2.导入项目

导入项目并下载相关依赖。

3.配置

该项目属于spring-web项目,需要配置相关端口号,默认是8086。同样,你也需要配置本机的数据库的用户名、密码以及地址。以上项目配置完成后还需完成框架配置,具体配置信息见下面的配置参数。

4.入门

当上面所有都配置完后,你就可以启动项目,本项目暂无前端UI层,但是提供了测试的Rest接口,你可以通过PostMan等测试工具进行测试

在URL地址栏输入:(POST)http://localhost:8086/auto_job/run_tasks_now

设置参数,选择body-row-json,输入以下参数:

{
    "methodClass": "com.example.autojob.job.Jobs",
    "methodName": "lithenTask",
    "circle":5000,
    "repeatTimes":2,
    "isEndurance":false,
    "attributes":[
        {
            "type":"string",
            "values":{
                "value":"hello world"
            }
        },
        {
            "type":"com.example.autojob.domain.Task",
            "values":{
                "methodName":"这是一个测试方法名"
            }
        }
    ]
}

如果成功执行你将会看到接口返回值

{
    "code": 200,
    "data": [
        {
            "id": 1639647582245,
            "startTime": "2021-12-16 17:39:41",
            "isStarted": 0,
            "isFinished": 0,
            "circle": 5000,
            "repeatTimes": 2,
            "finishedTimes": 0,
            "methodClassName": "com.example.autojob.job.Jobs",
            "methodName": "lithenTask",
            "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]",
            "isEndurance": false
        }
    ],
    "totalnum": 1,
    "message": "成功启动任务:lithenTask"
}

你将会看到控制台输出(为了结果清楚,暂时关闭ERROR级别以下的输出)

lithenTask方法如下图

@Component
public class Job{
    //从上下文获取事件管理器
    @Autowired
    TaskEventManage manager;
    
    public void lithenTask(String value,Task task) {
        //定义监听器
        TaskListener listener1 = taskEvent -> System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "准备启动");
        TaskListener listener2 = taskEvent -> {
            System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "已完成");
        };
        TaskListener listener3=taskEvent -> System.out.println("检测到任务:" + taskEvent.getTask().getMethodName() + "成功执行完成");
        //注册监听器,manager变量可以直接使用
        manager.addTaskEventListener(listener1,TaskBeforeRunEvent.class);
        manager.addTaskEventListener(listener2,TaskFinishedEvent.class);
        manager.addTaskEventListener(listener3,TaskSuccessEvent.class);
        System.out.println("lithenTask开始执行");
        System.out.println("传进来的参数:value="+value+",task.methodName="+task.getMethodName());
    }
}
5.解释

参数的解释可以看“立即执行任务接口”,lithenTask首先注册了三个监听器,分别监听任务的启动前,已完成,已成功完成三个事件,并输出对应的事件信息,由于监听器的注册在方法内部,所以事件监听输出会延迟。我们可以发现调度的方法使用了manager这个全局变量,而且是Autowired注入的,该框架寻找方法所在的对象有三步,首先是在Spring容器里查找,如果不存在且方法有参数则按照方法名和参数列表寻找,否则就直接通过方法名寻找。Job上有component注解,所以能在Spring容器获取到,因此能使用Autowired的变量。

配置参数
# 集中式任务调度平台配置
autoJob:
  task:
    # 任务调度队列最大任务数
    maxTaskNum: 100
    # 供任务执行的线程池最大线程数
    maxThreadNum: 100
  executor:
    # 调度器运行频率,调度器将按照该频率进行运转,该值过小可能会占用过多CPU资源:毫秒
    runCircle: 1000
    # 执行器在调度任务进入调度队列时,队列已满时的最大等待时长:毫秒
    maxJoinWait: 5000
    
    # 是否开启过长等待任务持久化,过长等待任务将先放入DB,保证任务队列的高效性
    longTimetaskDB: false
    # 当longTimetaskDB=true时,等待时长超过该阈值的任务会进行持久化,并且调度器会以该参数为周期从DB进行调度:分钟
    maxWait: 1
    # BY_SUBMIT检测时为心跳提交周期,线程应严格按照该参数周期性提交心跳:秒
    submitCircle: 10
    
    # 是否周期性从数据库调度未完成任务进入任务调度队列
    ifScedulingTasksFromDB: false
    
    # 从数据库调度任务进入任务调度队列的周期:分钟
    scedulingTasksFromDBCircle: 1
    # 每次调度任务的时间区间,从执行任务起往后的分钟数
    scedulingTasksFromDBTime: 5
  heartBeats:
  	# 心跳检测线程池最大线程数
    maxThread: 100
    
    # 有两种心跳检测模式,默认第一种
    # BY_SELF:心跳监测器自主式检测,这种检测方式可以简化需要监听的线程,但是心跳线程异常退出后可能无法自主启动心跳线程
    # BY_SUBMIT:线程提交式心跳检测,线程按照一定周期提交心跳,如果提交失败说明心跳线程可能异常退出,被监听线程可以尝试启动心跳检测线程,这种方式监听器和被监听线程耦合度高,被监听线程会执行冗余工作
    model: BY_SELF
    # BY_SUBMIT模式下有效,当被监听线程:在[理论提交心跳时间=心跳次数*心跳周期,理论提交心跳时间+repeatThreshold*提交心跳周期]区间内没有提交心跳时,监听器认为该线程已经退出,当ifRepeatTask=true时尝试重新运行被监听线程
    repeatThreshold: 0.01
    
    # 当检测到被监听线程异常停止时是否尝试重新启动,重新启动的任务将在心跳检测线程池执行
    ifRepeatTask: true
    # 当检测到被监听线程异常停止时最大尝试启动次数,当ifRepeatTask=true时有效
    maxRepeat: 5
    # 心跳检测器检测周期:毫秒
    listenCircle: 1000
    # 是否打印记录被监听线程的提交心跳日志
    isRecordSubmitHeart: false
使用说明

项目架构如下

除了controller和service包下的组件,其他均是必须的,controller和service可以通过任务调度器和任务注册器提供的API自己编写,项目启动时,任务调度队列、任务调度器、心跳检测器、任务生命周期事件管理器、注册器在项目启动时均会自动注入到Spring上下文并运行,无需调用启动。

应用层REST接口说明 插入计划任务接口

功能:插入一条新的计划任务到DB,该任务有两种触发方式:一种是通过注册任务接口注册进调度队列等待调用,一种是开启周期性调度任务功能:ifScedulingTasksFromDB=true

URL:POST:/auto_job/insert_task

参数示列

{
    "startTime": "2021-12-04 14:00:00", //计划启动时间(必选)
    "methodClass": "com.sccl.modules.job.conf.AutoJob", //任务所在类路径(必选)
    "methodName": "test6", //方法名(必选)
    "circle":70000, //运行周期(可选)默认0
    "repeatTimes":3, //重复执行次数(可选)默认1
    "isEndurance":true, //如果启动时间距现在的时间超过阈值是否进行持久化,当配置longTimetaskDB=true时生效,默认是true
    //方法参数,支持两种类型参数,string|Object,应该严格按照计划任务方法的参数列表顺序给出
    "attributes":[
        //参数为对象时type为该参数的类路径,values为该参数的某些属性,这些属性必须有相应的set方法,为了抑止“套娃行为”,set方法的参数必须为string
        {
            "type":"com.sccl.autojob.domain.Task",
            "values":{
                "startTime":"123"   
            }
        },
        //参数为字符串时,values只需要一个属性:value:"字符串内容"
        {
            "type":"string",
            "values":{
                "value":"123"   
            }
        }
    ]
}

返回值:

{
    "code": 200,
    "totalnum": 1,
    "message": "插入定时任务成功"
}
查询计划任务接口

URL:GET:/auto_job/select_tasks_id?IDS=?

参数列表

IDS=75//多个id请用逗号分隔

返回值

{
    "code": 200,
    "data": [
        {
            "id": 75,
            "startTime": "2021-12-17 12:00:00",
            "isStarted": 0,
            "isFinished": 0,
            "circle": 5000,
            "repeatTimes": 2,
            "methodClassName": "com.example.autojob.job.Jobs",
            "methodName": "lithenTask",
            "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]"
        }
    ],
    "totalnum": 1,
    "message": "查询成功"
}
注册任务接口

功能说明:从DB获取任务插入到调度队列供任务调度器动态调度

URL:GET:/auto_job/register_tasks?IDS=?&WAIT_TIME=?

参数列表

IDS=75//多个id请用逗号分隔
WAIT_TIME=1000(ms)//可选参数,默认为0:如果调度队列已满,应该等待的时长

返回值

{
    "code": 200,
    "totalnum": 1,
    "message": "注册任务到调度队列成功"
}
删除任务接口

功能说明:逻辑删除DB中指定计划任务

URL:GET:/auto_job/delete_tasks_ids?IDS=?

参数列表

IDS=74//多个id请用逗号分隔

返回值

{
    "code": 200,
    "totalnum": 1,
    "message": "删除成功"
}
强制停止正在运行中的计划任务接口

功能:强制停止正在执行中的计划任务,如果存在的话

URL:GET:/auto_job/stop_tasks_id?IDS=?

参数列表

IDS=111,222//多个id请用逗号分隔

返回值`

{
    "code": 200,
    "data": [
        {
            "id": 1639707039802,
            "isFinished": 1,
            "isSuccess": 0,
            "finishTime": "2021-12-17 10:10:44"
        }
    ],
    "totalnum": 1,
    "message": "强制停止任务成功"
}
查询已完成的所有任务接口

功能:字面意思

URL:GET:/auto_job/select_finished_tasks

返回值

{
    "code": 200,
    "data": [
        {
            "id": 76,
            "startTime": "2021-12-17 10:46:20",
            "isStarted": 0,
            "isFinished": 1,
            "finishTime": "2021-12-17 10:46:32",
            "methodClassName": "com.example.autojob.job.Jobs",
            "methodName": "waitting"
        }
    ],
    "totalnum": 1,
    "message": "查询成功"
}
立即执行计划任务接口

功能说明:立即执行指定的计划任务

URL:POST:/auto_job/run_tasks_now

参数示列

{
    "methodClass": "com.sccl.modules.job.conf.AutoJob", //任务所在类路径(必选)
    "methodName": "test6", //方法名(必选)
    "circle":70000, //运行周期(可选)默认0
    "repeatTimes":3, //重复执行次数(可选)默认1
    "isEndurance":true, //如果启动时间距现在的时间超过阈值是否进行持久化,当配置longTimetaskDB=true时生效,默认是true
    //方法参数,支持两种类型参数,string|Object,应该严格按照计划任务方法的参数列表顺序给出
    "attributes":[
        //参数为对象时type为该参数的类路径,values为该参数的某些属性,这些属性必须有相应的set方法,为了抑止“套娃行为”,set方法的参数必须为string
        {
            "type":"com.sccl.autojob.domain.Task",
            "values":{
                "startTime":"123"   
            }
        },
        //参数为字符串时,values只需要一个属性:value:"字符串内容"
        {
            "type":"string",
            "values":{
                "value":"123"   
            }
        }
    ]
}

返回值

{
    "code": 200,
    "data": [
        {
            "id": 1639709611149,
            "startTime": "2021-12-17 10:53:30",
            "isStarted": 0,
            "isFinished": 0,
            "circle": 5000,
            "repeatTimes": 1,
            "finishedTimes": 0,
            "methodClassName": "com.example.autojob.job.Jobs",
            "methodName": "waitting",
            "isEndurance": false
        }
    ],
    "totalnum": 1,
    "message": "成功启动任务:waitting"
}
获取正在任务队列中的所有任务接口

功能说明:包含正在执行和等待执行的所有任务,有时可能任务完成调度器还没监听到任务完成,也包含部分执行了的任务。

URL:GET:/auto_job/get_waiting_running_tasks

返回值

{
    "code": 200,
    "data": [
        {
            "id": 75,
            "startTime": "2021-12-17 12:00:00",
            "isStarted": 0,
            "isFinished": 0,
            "circle": 5000,
            "repeatTimes": 2,
            "methodClassName": "com.example.autojob.job.Jobs",
            "methodName": "lithenTask",
            "attributes": "[{"type":"string","values":{"value":"hello world"}},{"type":"com.example.autojob.domain.Task","values":{"methodName":"这是一个测试方法名"}}]"
        }
    ],
    "totalnum": 1,
    "message": "查询成功"
}
获取任务调度器核心线程运行状态接口

功能说明:字面意思

URL:GET:/auto_job/get_executor_status

返回值示列:

{
    "code": 200,
    "data": [
        {
            "ThreadName": "autoUpdateFinishedListener",
            "status": "running"
        },
        {
            "ThreadName": "startTimeListener",
            "status": "running"
        },
        {
            "ThreadName": "taskFinishListener",
            "status": "running"
        },
        {
            "ThreadName": "autoScedulingTaskToDB",
            "status": "running"
        }
    ],
    "message": "success"
}
重启任务调度器接口

功能说明:关闭任务调度器的所有核心线程并重启,并且在心跳检测器重新注册心跳,之前的心跳信息将会全部清空。此 *** 作不会删除任务调度队列中的待执行任务,也不会终止正在执行的计划任务

URL:GET:/auto_job/restart_executor

返回值

{
    "code": 200,
    "message": "重启任务调度器成功"
}

控制台输出

autoUpdateFinishedListener的心跳注销成功
关闭线程:autoUpdateFinishedListener成功
startTimeListener的心跳注销成功
关闭线程:startTimeListener成功
taskFinishListener的心跳注销成功
关闭线程:taskFinishListener成功
执行器初始化......
执行器状态:{maxThreads=100, runCircle=1000, maxWaitTime=1, maxJoinWait=5000, longTimetaskDB=false, ifScedulingTasksFromDB=false, scedulingTasksFromDBCircle=1, scedulingTasksFromDBTime=5}
注册autoUpdateFinishedListener的心跳已完成
启动线程:autoUpdateFinishedListener成功
注册startTimeListener的心跳已完成
启动线程:startTimeListener成功
注册taskFinishListener的心跳已完成
启动线程:taskFinishListener成功
=============================================>自动更新完成任务监听器启动
=============================================>定时任务监听器启动
=============================================>任务关闭监听器启动
启动任务调度器接口

功能说明:如果任务调度器有某些核心线程未启动则启动

URL:GET:/auto_job/start_executor

返回值

{
    "code": 200,
    "message": "启动任务调度器成功"
}
强制停止任务调度器接口

功能说明:关闭任务调度器的所有核心线程,并且在心跳检测器注销心跳,之前的心跳信息将会全部清空。此 *** 作不会删除任务调度队列中的待执行任务,也不会终止正在执行的计划任务

URL:GET:/auto_job/stop_executor

返回值

{
    "code": 200,
    "message": "关闭任务调度器成功"
}
获取心跳检测器状态接口

功能说明:字面意思

URL:GET:/auto_job/get_heart_beat_status

返回值示列:

{
    "code": 200,
    "data": {
        "model": "BY_SELF",
        "status": "running"
    },
    "message": "success"
}
重启心跳检测器接口

功能说明:注销心跳检测器并且重新启动,该 *** 作不会注销线程的登记信息

URL:GET:/auto_job/get_heart_beat_status

返回值

{
    "code": 200,
    "message": "重启心跳检测器成功"
}

控制台输出

心跳检测器初始化......
心跳检测器状态:采用BY_SUBMIT心跳检测模式,{maxThreads=100, submitTime=BY_SUBMIT', repeatDoor=0.01, ifRepeatTask=true, maxRepeat=5, isRecordSubmitHeart=false, listenCircle=1000}
=============================================>线程提交式心跳检测器已启动

_status

返回值示列:

{
    "code": 200,
    "data": {
        "model": "BY_SELF",
        "status": "running"
    },
    "message": "success"
}
重启心跳检测器接口

功能说明:注销心跳检测器并且重新启动,该 *** 作不会注销线程的登记信息

URL:GET:/auto_job/get_heart_beat_status

返回值

{
    "code": 200,
    "message": "重启心跳检测器成功"
}

控制台输出

心跳检测器初始化......
心跳检测器状态:采用BY_SUBMIT心跳检测模式,{maxThreads=100, submitTime=BY_SUBMIT', repeatDoor=0.01, ifRepeatTask=true, maxRepeat=5, isRecordSubmitHeart=false, listenCircle=1000}
=============================================>线程提交式心跳检测器已启动

资源链接:https://download.csdn.net/download/qq_45060540/65332556
GItee地址:https://gitee.com/hyxl-520/autojob.git

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676521.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存