接下来看看作业是如何提交的,也比较简单,就是调用了processDao.submitTask。
protected TaskInstance submit(){ Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES, Constants.defaultMasterCommitRetryTimes); Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL, Constants.defaultMasterCommitRetryInterval); int retryTimes = 1; while (retryTimes <= commitRetryTimes){ try { TaskInstance task = processDao.submitTask(taskInstance, processInstance); if(task != null){ return task; } logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes); Thread.sleep(commitRetryInterval); } catch (Exception e) { logger.error("task commit to mysql and queue failed : " + e.getMessage(),e); } retryTimes += 1; } return null; }
根据前面的分析我们知道processDao就是跟数据库打交道的,暂时猜测这里就是把任务实例插入到了数据。
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){ logger.info("start submit task : {}, instance id:{}, state: {}, ", taskInstance.getName(), processInstance.getId(), processInstance.getState() ); processInstance = this.findProcessInstanceDetailById(processInstance.getId()); //submit to mysql TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance); if(task.isSubProcess() && !task.getState().typeIsFinished()){ ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task); TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); MapsubProcessParam = JSONUtils.toMap(taskNode.getParams()); Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task); }else if(!task.getState().typeIsFinished()){ //submit to task queue task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); submitTaskToQueue(task); } logger.info("submit task :{} state:{} complete, instance id:{} state: {} ", taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task;
这段代码优点复杂,但只需要看其主干逻辑就好了。也就是调用submitTaskInstanceToMysql把任务实例插入到数据库,然后调用submitTaskToQueue(目前还看不出插入到了哪里)。
submitTaskInstanceToMysql不再贴源码分析,与函数名差不多,就是把instance插入到数据库。
submitTaskToQueue主干逻辑就是把taskInstance添加到了TaskQueue。
public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ // task cannot submit when running if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName())); return true; } if(checkTaskExistsInTaskQueue(taskInstance)){ logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName())); return true; } logger.info("task ready to queue: {}" , taskInstance); taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) ); return true; }catch (Exception e){ logger.error("submit task to queue Exception: ", e); logger.error("task queue error : %s", JSONUtils.toJson(taskInstance)); return false; } }
taskQueue是TaskQueueFactory.getTaskQueueInstance()创建的。
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
getTaskQueueInstance其实就是调用了TaskQueueZkImpl.getInstance(),这应该是一个遗留接口。估计设计初期是想根据配置创建不同的任务队列,比如redis或者其他,目前只支持zookeeper。
public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { logger.info("task queue impl use zookeeper "); return TaskQueueZkImpl.getInstance(); }else{ logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit "); System.exit(-1); } return null; }
既然只支持zookeeper,这段代码是冗余的
这样来看submitTaskToQueue就是调用TaskQueueZkImpl.add方法,把任务实例插入到了zookeeper实现的队列中。
public void add(String key, String value) { try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); } }
从上下文我们知道,这里的key就是tasks_queue;根据注释,value就是 ${processInstancePriority}${processInstanceId}${taskInstancePriority}_${taskId}_host1,host2...
这样来看,add就是在zk的tasks_queue父节点下创建子节点,子节点的data就是value的值。
submit的逻辑分析完毕,来继续分析submitWaitComplete的剩余主要逻辑:waitTaskQuit。
waitTaskQuit代码比较多,先从整体来分析其逻辑:
- 通过taskInstance.id查询taskInstance。其实就是查询taskInstance的最新状态。
- 通过参数判断是否启用超时检查
- 一个while“死循环”。
- while中判断任务是否执行结束,是则退出
- 获取任务实例、流程实例最新状态
- 休眠1秒,进入下一次while循环
简单来说waitTaskQuit就是循环查看任务实例的状态,直至其成功。
MasterTaskExecThread的功能整体来看就是把任务实例信息插入到数据库,并放到zookeeper队列,然后循环等待任务实例的状态变成完成,并没有任何具体的执行逻辑。
Stopper.isRunning()作为一个全局变量,控制了N多的线程,每个线程都处于一个while“死循环”中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)