Elastic-job是当当网架构师基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。
Elastic-job除了支持单个作业按照规定的时间触发外,还可以将多个作业通过有向无环图(DAG)的方式生成依赖关系,然后按照DAG中的依赖关系依次调度执行这些作业。
下面说一下实现这个功能的技术方案。
假设在一个Elastic-job集群中,有三个作业A,B,C。
它们的依赖关系依次是A-->B-->C。
每个作业都有一个JobConfiguration对象来记录它们的基本属性,JobConfiguration定义如下:
public final class JobConfiguration {
private final String jobName;
private final String cron;
private final int shardingTotalCount;
private final String shardingItemParameters;
private final String jobParameter;
private final boolean monitorExecution;
private final boolean failover;
private final boolean misfire;
private final int maxTimeDiffSeconds;
private final int reconcileIntervalMinutes;
private final String jobShardingStrategyType;
private final String jobExecutorServiceHandlerType;
private final String jobErrorHandlerType;
private final String description;
private final Properties props;
private final boolean disabled;
private final boolean overwrite;
private final JobDagConfiguration jobDagConfiguration;
}
public final class JobDagConfiguration {
/** DAG 名称 */
private String dagName;
/** DAG中该作业依赖的上一级作业. */
private String dagDependencies;
/** DAG 重试次数. */
private int retryTimes;
/** DAG 重试间隔. */
private int retryInterval;
/** 是否可以单独运行. */
private boolean dagRunAlone;
/** 本作业执行失败,是否可以执行后面的作业 */
private boolean dagSkipWhenFail;
}
在JobConfiguration这个类中增加了属性 JobDagConfiguration,JobDagConfiguration中定义了作业的依赖关系。
dagName字段表示DAG的名称,对于A,B,C三个作业而言,它们属于同一个DAG,它们的dagName相同,姑且都设置为dagNameXXXX。
dagDependencies字段表示作业依赖的上一级作业。
A作业是第一个作业,所以dagDependencies=self;B作业依赖A,所以B作业的dagDependencies=A;C作业依赖B,所以C作业的dagDependencies=B。
retryTimes表示重试次数,如果某个作业执行失败,还可以重试retryTimes次。
dagSkipWhenFail表示某个作业执行失败了,是否执行后面的作业,dagSkipWhenFail=false表示停止执行。
Elastic-job的执行流程在类ElasticJobExecutor中完成,具体流程如下:
public final class ElasticJobExecutor {
/**
* 执行作业
*/
public void execute() {
try {
判断job的执行环境,本地时间和zk中的时间差,如果小于规定的时间就可以执行
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException ex) {
jobErrorHandler.handleException(jobConfig.getJobName(), ex);
}
//判断是否是DAG类型的作业,如果是需要检查作业的状态和依赖关系,生成作业关系图
if (jobFacade.isDagJob()) {
try {
jobFacade.checkDagStates();
jobFacade.checkDagJobDependencies();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
log.error("DAG job - {} exception! Check !", jobConfig.getJobName(), ex);
return;
}
}
//获取当前实例执行的上下文,包括分片信息等重要内容
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
//如果有任务正在执行,就退出,下次执行的时候在补充执行
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
shardingContexts.getShardingItemParameters().keySet()));
return;
}
try {
//执行作业前的监听器
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable ex) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), ex);
}
//执行作业
execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
//执行之前错过执行的作业
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, ExecutionSource.MISFIRE);
}
//失效转移,执行其它执行器因为宕机而没有执行的作业
jobFacade.failoverIfNecessary();
//任务执行后的监听器
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable ex) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), ex);
}
}
}
从上面的执行流程可以看出,DAG类型在作业在执行之前需要检查DAG的状态和依赖关系。
状态检查中,如果当前DAG状态不是running,则会进行初始化,生成作业依赖关系图。
依赖关系检查中,如果依赖关系中表示当前作业不能执行,则会抛出异常,作业就会立即退出。
如果依赖关系中可以执行,就会像普通类型的作业一样,进入后面的作业执行流程。
DAG依赖关系的检查流程如下:
//检查DAG的状态
public void checkDagStates() {
//如果DAG状态为running,则退出
if (dagService.getDagStates() == DagStates.RUNNING) {
return;
}
//如果状态为pause,则说明DAG为中止状态,抛出异常
if (dagService.getDagStates() == DagStates.PAUSE) {
throw new DagRuntimeException("Dag Job states PAUSE");
}
//如果是DAG的第一级作业,则进行初始化 *** 作、修改DAG的状态为running
if (dagService.isDagRootJob()) {
dagService.changeDagStatesAndReGraph();
}
//不是running,抛出异常
if (dagService.getDagStates() != DagStates.RUNNING) {
log.info("DAG states not right {}", dagService.getDagStates().getValue());
throw new DagRuntimeException("Dag states Not right!");
}
}
其中函数changeDagStatesAndReGraph()是DAG任务的一些初始化 *** 作,具体如下:
public void changeDagStatesAndReGraph() {
......
try {
//获取本次执行任务的批次号
String batchNo = getBatchNo();
//获取所有作业的依赖关系,Map中的key是作业名称,value是依赖的作业。
Map> allDagNode = dagNodeStorage.getAllDagConfigJobs();
//检查是否有环
checkCycle(allDagNode);
//根据依赖关系生成图信息
dagNodeStorage.initDagGraph(allDagNode, batchNo);
//更新DAG状态
dagNodeStorage.updateDagStates(DagStates.RUNNING);
//更新DAG中job的状态
dagNodeStorage.updateDagJobStates(JobStateEnum.RUNNING);
} catch (Exception ex) {
......
} finally {
releaseDagLeader();
}
}
在函数initDagGraph中,会将作业依赖的上级作业写入zookeeper中。
在检查完DAG的状态后,还需要在函数checkDagJobDependencies中检查作业的依赖关系。
如果某个作业依赖的作业还没有完成,就抛出异常,中止本次执行。
在DAG中,一个作业执行完成以后,会通过zookeeper的监听机制触发下一级的作业执行。
通过在zookeeper上注册监听函数,在作业执行完以后,会触发启动下一级作业的 *** 作,具体如下:
public void event(final Type type, final ChildData oldData, final ChildData data) {
if (!(type == Type.NODE_CHANGED || type == Type.NODE_CREATED)) {
return;
}
//获取本次作业执行后的状态
JobStateEnum jobState = JobStateEnum.of(new String(data.getData()));
//正在执行中,,则退出
if (jobState == JobStateEnum.RUNNING || jobState == JobStateEnum.NONE) {
log.debug("Dag-{}[{}] receive job state EVENT NOT last state-[{}], skip.", dagName, jobName, jobState);
return;
}
// 执行失败,则重试
if (jobState == JobStateEnum.FAIL) {
if (retryJob()) {
log.info("Dag-{}[{}] Put FAIL job to DQ Success! Waiting Triggered!", dagName, jobName);
return;
}
} else {
dagNodeStorage.updateDagJobStates(jobState);
}
//触发本作业的下一级作业
List willTriggerJobs = nextShouldTriggerJob();
// 下一级作业为空,则DAG中的作业全部执行完成,更新DAG状态
if (willTriggerJobs.isEmpty()) {
if (hasNoJobRunning()) {
log.info("Dag-{}, No job running, Start statistics The DAG State.", dagName);
statisticsDagState();
return;
}
} else {
//下一级作业不为空,则触发下一级作业执行。
willTriggerJobs.forEach(job -> {
postEvent("trigger", "Trigger Job");
dagNodeStorage.triggerJob(job);
});
}
}
通过上面的代码可以看出,每个作业执行完成之后都会检查是否有下一级作业要执行,如果有就触发执行,否则说明DAG中任务执行完成,更新DAG状态为完成,结束所有任务执行。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)