分布式调度

分布式调度,第1张

分布式调度 分布式调度

本博文可以分三部分进行学习,分布式调度的了解,Quartz 使用,Elastic-Job 使用。

分布式调度定义

分布式任务调度有两层含义

    运行在分布式集群环境下的调度任务(同一个定时任务部署多分,只应该有一个定时任务在执行)分布式调度–> 定时任务的分布式 --> 定时任务的拆分 (把一个大的作业任务拆分为多个小的作业)
定时任务的场景

每个一定时间,特定某一时刻执行

订单审核、出库订单超时⾃动取消、⽀付退款礼券同步、⽣成、发放作业物流信息推送、抓取作业、退换货处理作业数据积压监控、⽇志监控、服务可⽤性探测作业定时备份数据⾦融系统每天的定时结算数据归档、清理作业报表、离线数据分析作业 定时任务的实现方式

定时任务的实现方式有多种,早期没有定时任务框架的时候,我们会使用 JDK 中的 Timer 机制和多线程机制(Runnable + 线程休眠 sleep)来实现定时或者间隔一段时间执行某一段程序,后来有了定时任务框架 Quartz,使用 cron 表达式来进行定时任务。这里简单的描述 Quartz 持久化定时任务的实现,与本文的分布式调度关系不大,可以直接跳到分布式调度框架Elastic-Job

Maven 引入 jar 包

	
        1.1.18
        8.0.15
        3.4.0
        3.6.0
    

		
        
            org.springframework.boot
            spring-boot-starter-quartz
            2.3.8.RELEASE
        

		
            com.baomidou
            mybatis-plus-boot-starter
            ${mybatis.plus.version}
        

定时任务作业主要调度程序

@Service
public class ScheduleTaskServiceImpl extends ServiceImpl implements IScheduleTaskService, InitializingBean{

    // 定时任务管理
    private Scheduler scheduler;

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Override
    public boolean scheduleJob(ScheduleTask task) {

        try {
            JobKey jobKey = JobKey.jobKey(task.getTaskName(), task.getTaskGroup());
            Job job = (Job) ApplicationContextUtil.getBean(task.getJobClass());
            String cron = task.getCronexpression();
            Map jobMap = task.getJobDataMap();
            JobDetail jobDetail = QuartzUtil.getJobDetail(jobKey, task.getDescription(),
                    QuartzUtil.getJobDataMap(jobMap), job);
//            //前端不能传cron完整表达式,补充一位秒的占位
//            cron = "0/30 " + cron;
            Trigger trigger = QuartzUtil.getTrigger(jobKey, task.getDescription(),
                    QuartzUtil.getJobDataMap(jobMap), cron);
            // 开启定时任务
            if(scheduler.checkExists(jobKey)) {
                scheduler.deleteJob(jobKey);
            }
            scheduler.scheduleJob(jobDetail, trigger);
            if (scheduler.isShutdown()){
                scheduler.start();
            }
            return true;
        } catch (Exception e) {
            log.error("【定时器业务处理】添加定时任务失败,{}",ExceptionUtil.getMessage(e));
            return false;
        }
    }

    @Override
    public boolean startup(int id) {
        ScheduleTask scheduleTask = getById(id);
        return scheduleJob(scheduleTask);
    }

    @Override
    public void deleteJob(Integer id) {
        stop(id);
        baseMapper.deleteById(id);
    }

    @Override
    public void insertJob(ScheduleTask task) {
        if (task != null) {
            baseMapper.insert(task);
        }
    }

    @Override
    public List getScheduleJobList() {
        try {
            List result = baseMapper.selectList(null);
            return result;
        } catch (Exception e) {
            log.error("【定时器业务处理】查询任务失败n{}", ExceptionUtil.getMessage(e));
            return new ArrayList();
        }
    }

    @Override
    public ScheduleTask getTaskByGroupAndName(String group, String name) {
        return baseMapper.getTaskByGroupAndName(group, name);
    }

    @Override
    public boolean stop(ScheduleTask task) {
        String name = task.getTaskName();
        String group = task.getTaskGroup();
        JobKey jobKey = JobKey.jobKey(task.getTaskName(), task.getTaskGroup());
        try {
            scheduler.pauseJob(jobKey);
            return scheduler.deleteJob(JobKey.jobKey(name, group));
        } catch (SchedulerException e) {
            log.error("【定时器业务处理】关闭定时任务失败,name={}, group={}", name, group);
            throw new ServiceException("关闭定时任务失败");
        }
    }

    @Override
    public boolean stop(int id) {
        ScheduleTask scheduleTask = getById(id);
        return stop(scheduleTask);
    }

    @Override
    public List getByPlatformIdAndJobClass(int platformId, String jobClass) {
        QueryWrapper query = Wrappers.query();
        query.eq("platform_id", platformId)
                .eq("job_class", jobClass);
        return baseMapper.selectList(query);
    }


    @Override
        public void afterPropertiesSet() throws Exception {
        log.info("【定时器业务处理】初始化定时任务工厂");
        // 项目启动
        scheduler = schedulerFactoryBean.getScheduler();
        List taskList = baseMapper.selectList(null);
        try {
            for (ScheduleTask task : taskList) {
                if (task.getStatus() != 0) {
                    this.scheduleJob(task);
                }

            }
        } catch (Exception e) {
            log.error("【定时器业务处理】初始化任务失败n{}", ExceptionUtil.getMessage(e));
        }
    }

    @Override
    public boolean saveOrUpdate(ScheduleTask task) {
        if (task.getId() != null && task.getId() != 0) {
            ScheduleTask task2 = getById(task.getId());
            task.setTaskName(task2.getTaskName());
            task.setTaskGroup(task2.getTaskGroup());
            super.saveOrUpdate(task);
        } else {
            if (getByPlatformIdAndJobClass(task.getPlatformId(), task.getJobClass()).size() > 0 ) {
                throw new ServiceException("该平台定时器添加过多");
            }
            task.setTaskGroup("crawler");
            task.setTaskName(StringUtil.getUUID(6));
            task.setJobData(JSONObject.toJSONString(new HashMap() {{
                put("platformId", task.getPlatformId());
            }}));
            baseMapper.insert(task);
        }
        if (task.getStatus().equals(1)) {
            startup(task.getId());
        } else {
            stop(task.getId());
        }
        return true;
    }

}

主要的流程

通过 schedulerFactoryBean 创建一个调度器 Scheduler,用于调度任务创建任务 Job 与触发器 Trigger使用调度器 Scheduler 来调度作业

定时任务持久化对象 ScheduleTask

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "ScheduleTask对象", description = "定时任务表")
public class ScheduleTask implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    @ApiModelProperty(value = "任务名称")
    private String taskName;

    @ApiModelProperty(value = "组名")
    private String taskGroup;

    @ApiModelProperty(value = "描述")
    private String description;

    @ApiModelProperty(value = "cron表达式")
    private String cronexpression;

    @ApiModelProperty(value = "任务类型")
    private String jobClass;

    @ApiModelProperty(value = "备注")
    private String remark;

    @ApiModelProperty(value = "任务状态 (0-关闭,1-开启)")
    private Integer status;

    @ApiModelProperty(value = "任务延迟")
    private Integer delay;

    @ApiModelProperty(value = "修改时间")
    private LocalDateTime modifyTime;

    @ApiModelProperty(value = "创建时间")
    private LocalDateTime createTime;

    @ApiModelProperty(value = "平台主键")
    private Integer platformId;

    @ApiModelProperty(value = "任务所需参数,jsonstring 格式")
    private String jobData;

    @JsonIgnore
    public Map getJobDataMap() {
        Map result = new HashMap<>();
        result.put("delayTime", this.delay);
        JSONObject jobJson = JSONObject.parseObject(jobData);
        result.putAll(jobJson.getInnerMap());
        return result;
    }


    public String extRemark() {
        String[] cronTime = new String[]{"每%s秒执行一次", "每%s分钟执行一次", "每%s小时执行一次",
                "每%s天的随机点执行一次", "每%s月的1号随机点执行一次", "每天%s点执行一次", "默认每%s秒执行一次"};
        int cronRate = CronUtil.getCronRate(cronexpression);
        int cronCycle = CronUtil.getCronCycle(cronexpression);
        String format = String.format(cronTime[cronRate], cronCycle);
        return format;
    }
}

Quartz 的任务 Job 与触发器 Trigger 的创建工具

public class QuartzUtil {
    
    public static JobDetail getJobDetail(JobKey jobKey, String description, JobDataMap jobDataMap, Job jobClass) {
        return JobBuilder.newJob(jobClass.getClass())
                .withIdentity(jobKey)
                .withDescription(description)
                .setJobData(jobDataMap)
                .usingJobData(jobDataMap)
                .requestRecovery()
                .storeDurably()
                .build();
    }

    
    public static Trigger getTrigger(JobKey jobKey, String description, JobDataMap jobDataMap, String cronexpression) {
        return TriggerBuilder.newTrigger()
                .withIdentity(jobKey.getName(), jobKey.getGroup())
                .withDescription(description)
                .withSchedule(CronScheduleBuilder.cronSchedule(cronexpression))
                .usingJobData(jobDataMap)
                .startNow()
                .build();
    }

    
    public static JobDataMap getJobDataMap(Map map) {
        return map == null ? new JobDataMap() : new JobDataMap(map);
    }


}
分布式调度框架Elastic-Job Elastic-Job 介绍

Elastic-Job 的 github 地址:https://github.com/elasticjob

网上的介绍,Elastic_Job 是当当网开源的一个分布式调度解决方案,基于 Quartz 二次开发的,功能非常丰富强大,采用 zookeeper 实现分布式调度,实现任务分片以及高可用。目前由两个相互独立的子项目 Elatstic-Job-Lite 和 Elastic-Job-Cloud 组成。目前说的是 Elastic-Job-Lite 的轻量级解决方案,使用 jar 的形式提供分布式任务的调度服务,而 Elastic-Job-Cloud 是结合 Mesos 以及 Docker 在云环境下使用,后期博文会有提及。

功能列表:

**分布式调度协调:**在分布式环境中,任务能够按指定的调度策略执行,并能够避免同一任务多实例重复执行**丰富调度策略:**基于成熟的定时任务作业框架 Quartz cron 表达式执⾏定时任务**d性扩容缩容:**当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。**失效转移:**某实例在任务执行失败后,会被转移到其他实例执行。**错过执⾏作业重触发:**若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。**⽀持并⾏调度:**支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。**作业分⽚⼀致性:**当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。支持作业生命周期 *** 作:可以动态对任务进行开启及停止 *** 作。丰富的作业类型:支持 Simple、DataFlow、script 三种作业类型,后续会有详细介绍。Spring 整合以及命名空间支持:对Spring支持良好的整合方式,支持 Spring 自定义命名空间,支持占位符。运维平台:提供运维界面,可以管理作业和注册中心。

下边就围绕以上的功能进行实践使用。

Elastic-Job 使用 Zookeeper 搭建使用

Elastic-Job 依赖于 Zookeeper 尽心分布式协调。需要安装 3.4.6 版本以上。我这里安装的是 3.7.0 版本。下载地址

目前使用的是单例 zk,默认端口 2181

解压后的目录
bin/
conf/
docs/
lib/
LICENSE.txt
NOTICE.txt
README.md
README_packaging.md

在 conf 路径下创建配置文件

$ cp zoo_sample.cfg zoo.cfg

基本命令:

启动 ./zkServer.sh start

停⽌ ./zkServer.sh stop

查看状态 ./zkServer.sh status

推荐使用的

maven

    com.dangdang
    elastic-job-lite-core
    2.1.5

程序开发

代码示例地址:https://gitee.com/teaegg/elastic-job-test.git

具体实现可以看代码,主要测试步骤是启动多个进程。

轻量级去中心化

Elastic—job的两个特点

轻量级

所有的实现都在 jar 中,必须的依赖仅仅是 zookeeper并非独立部署的中间件,就像 jar 程序 去中心化

执行任务的节点对等,存在不一样的是分片定时调度自触发,没有中心调度节点的分配服务自发现,没有通过注册中心的服务发现主节点非固定 任务分片与失效转移

目前存在一个问题,需要处理一亿的数据,如果用一个作业点来处理需要花费很长的时间。Elastic-Job 可以把这个巨大的作业点划分成多个作业点,作业点的处理逻辑可以自行决定。划分的策略是平均区分,也可以定制。本次实现的是划分了3个任务

        // 配置切片任务 (调度器、任务业务逻辑、触发器)
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder("elastic-job-name", "*/10 * * * * ?", 3)
                .shardingItemParameters("0=task0,1=task1,2=task2").build();

启动后的打印

如果其中一台机器或程序出现问题,会把任务转移到其他程序,例如下图:

停止了程序 3之后,任务被转移到程序 2中。

d性扩容

如果程序 3重新启动注册到 zk 中,注册中⼼会通知 Elastic-Job 进⾏重新分⽚,3个分片又会重新平均的分配到各个实例中去。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5711176.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存