Elastic-job支持基于有向无环图(DAG)的作业依赖技术方案

Elastic-job支持基于有向无环图(DAG)的作业依赖技术方案,第1张

Elastic-job是当当网架构师基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。


Elastic-job除了支持单个作业按照规定的时间触发外,还可以将多个作业通过有向无环图(DAG)的方式生成依赖关系,然后按照DAG中的依赖关系依次调度执行这些作业。


下面说一下实现这个功能的技术方案。


作业的定义

假设在一个Elastic-job集群中,有三个作业A,B,C。


它们的依赖关系依次是A-->B-->C。


每个作业都有一个JobConfiguration对象来记录它们的基本属性,JobConfiguration定义如下:

public final class JobConfiguration {
    
    private final String jobName;
    
    private final String cron;
    
    private final int shardingTotalCount;
    
    private final String shardingItemParameters;
    
    private final String jobParameter;
    
    private final boolean monitorExecution;
    
    private final boolean failover;
    
    private final boolean misfire;
    
    private final int maxTimeDiffSeconds;
    
    private final int reconcileIntervalMinutes;
    
    private final String jobShardingStrategyType;
    
    private final String jobExecutorServiceHandlerType;
    
    private final String jobErrorHandlerType;
    
    private final String description;
    
    private final Properties props;
    
    private final boolean disabled;
    
    private final boolean overwrite;

    private final JobDagConfiguration jobDagConfiguration;
}

public final class JobDagConfiguration {
    /** DAG 名称 */
    private String dagName;

    /** DAG中该作业依赖的上一级作业. */
    private String dagDependencies;

    /** DAG 重试次数. */
    private int retryTimes;

    /** DAG 重试间隔. */
    private int retryInterval;

    /** 是否可以单独运行.  */
    private boolean dagRunAlone;

    /** 本作业执行失败,是否可以执行后面的作业 */
    private boolean dagSkipWhenFail;
}

在JobConfiguration这个类中增加了属性 JobDagConfiguration,JobDagConfiguration中定义了作业的依赖关系。


      dagName字段表示DAG的名称,对于A,B,C三个作业而言,它们属于同一个DAG,它们的dagName相同,姑且都设置为dagNameXXXX。


      dagDependencies字段表示作业依赖的上一级作业。


A作业是第一个作业,所以dagDependencies=self;B作业依赖A,所以B作业的dagDependencies=A;C作业依赖B,所以C作业的dagDependencies=B。


      retryTimes表示重试次数,如果某个作业执行失败,还可以重试retryTimes次。


      dagSkipWhenFail表示某个作业执行失败了,是否执行后面的作业,dagSkipWhenFail=false表示停止执行。


作业的执行

    Elastic-job的执行流程在类ElasticJobExecutor中完成,具体流程如下:

public final class ElasticJobExecutor {
     /**
     * 执行作业
     */
    public void execute() {
        try {
            判断job的执行环境,本地时间和zk中的时间差,如果小于规定的时间就可以执行
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException ex) {
            jobErrorHandler.handleException(jobConfig.getJobName(), ex);
        }
        //判断是否是DAG类型的作业,如果是需要检查作业的状态和依赖关系,生成作业关系图
        if (jobFacade.isDagJob()) {
            try {
                jobFacade.checkDagStates();
                jobFacade.checkDagJobDependencies();
                //CHECKSTYLE:OFF
            } catch (final Exception ex) {
                //CHECKSTYLE:ON
                log.error("DAG job - {} exception! Check !", jobConfig.getJobName(), ex);
                return;
            }
        }
        //获取当前实例执行的上下文,包括分片信息等重要内容
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        //如果有任务正在执行,就退出,下次执行的时候在补充执行
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                    shardingContexts.getShardingItemParameters().keySet()));
            return;
        }
        try {
            //执行作业前的监听器
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable ex) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), ex);
        }
        //执行作业
        execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
        //执行之前错过执行的作业
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(shardingContexts, ExecutionSource.MISFIRE);
        }
        //失效转移,执行其它执行器因为宕机而没有执行的作业
        jobFacade.failoverIfNecessary();
        //任务执行后的监听器
        try {
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable ex) {
            //CHECKSTYLE:ON
            jobErrorHandler.handleException(jobConfig.getJobName(), ex);
        }
    }
}

从上面的执行流程可以看出,DAG类型在作业在执行之前需要检查DAG的状态和依赖关系。


状态检查中,如果当前DAG状态不是running,则会进行初始化,生成作业依赖关系图。


依赖关系检查中,如果依赖关系中表示当前作业不能执行,则会抛出异常,作业就会立即退出。


如果依赖关系中可以执行,就会像普通类型的作业一样,进入后面的作业执行流程。


DAG依赖关系的检查流程如下:

    //检查DAG的状态
    public void checkDagStates() {
        //如果DAG状态为running,则退出
        if (dagService.getDagStates() == DagStates.RUNNING) {
            return;
        }
        //如果状态为pause,则说明DAG为中止状态,抛出异常
        if (dagService.getDagStates() == DagStates.PAUSE) {
            throw new DagRuntimeException("Dag Job states PAUSE");
        }
        //如果是DAG的第一级作业,则进行初始化 *** 作、修改DAG的状态为running
        if (dagService.isDagRootJob()) {
            dagService.changeDagStatesAndReGraph();
        }
        //不是running,抛出异常
        if (dagService.getDagStates() != DagStates.RUNNING) {
            log.info("DAG states not right {}", dagService.getDagStates().getValue());
            throw new DagRuntimeException("Dag states Not right!");
        }
    }

其中函数changeDagStatesAndReGraph()是DAG任务的一些初始化 *** 作,具体如下:

public void changeDagStatesAndReGraph() {
        ......

        try {
            //获取本次执行任务的批次号
            String batchNo = getBatchNo();
            //获取所有作业的依赖关系,Map中的key是作业名称,value是依赖的作业。


Map> allDagNode = dagNodeStorage.getAllDagConfigJobs(); //检查是否有环 checkCycle(allDagNode); //根据依赖关系生成图信息 dagNodeStorage.initDagGraph(allDagNode, batchNo); //更新DAG状态 dagNodeStorage.updateDagStates(DagStates.RUNNING); //更新DAG中job的状态 dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING); } catch (Exception ex) { ...... } finally { releaseDagLeader(); } }

在函数initDagGraph中,会将作业依赖的上级作业写入zookeeper中。


在检查完DAG的状态后,还需要在函数checkDagJobDependencies中检查作业的依赖关系。


如果某个作业依赖的作业还没有完成,就抛出异常,中止本次执行。


作业的触发

在DAG中,一个作业执行完成以后,会通过zookeeper的监听机制触发下一级的作业执行。


通过在zookeeper上注册监听函数,在作业执行完以后,会触发启动下一级作业的 *** 作,具体如下:

public void event(final Type type, final ChildData oldData, final ChildData data) {
        if (!(type == Type.NODE_CHANGED || type == Type.NODE_CREATED)) {
            return;
        }
        //获取本次作业执行后的状态
        JobStateEnum jobState = JobStateEnum.of(new String(data.getData()));
        //正在执行中,,则退出
        if (jobState == JobStateEnum.RUNNING || jobState == JobStateEnum.NONE) {
            log.debug("Dag-{}[{}] receive job state EVENT NOT last state-[{}], skip.", dagName, jobName, jobState);
            return;
        }

        // 执行失败,则重试
        if (jobState == JobStateEnum.FAIL) {
            if (retryJob()) {
                log.info("Dag-{}[{}] Put FAIL job to DQ Success! Waiting Triggered!", dagName, jobName);
                return;
            }
        } else {
            dagNodeStorage.updateDagJobStates(jobState);
        }

      

        //触发本作业的下一级作业
        List willTriggerJobs = nextShouldTriggerJob();

        // 下一级作业为空,则DAG中的作业全部执行完成,更新DAG状态
        if (willTriggerJobs.isEmpty()) {
            if (hasNoJobRunning()) {
                log.info("Dag-{}, No job running, Start statistics The DAG State.", dagName);
                statisticsDagState();
                return;
            }
         
        } else {
           //下一级作业不为空,则触发下一级作业执行。


willTriggerJobs.forEach(job -> { postEvent("trigger", "Trigger Job"); dagNodeStorage.triggerJob(job); }); } }

通过上面的代码可以看出,每个作业执行完成之后都会检查是否有下一级作业要执行,如果有就触发执行,否则说明DAG中任务执行完成,更新DAG状态为完成,结束所有任务执行。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/607212.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-14
下一篇 2022-04-14

发表评论

登录后才能评论

评论列表(0条)

保存