在上一篇文章的最后,当stage划分完了,task计算好了最佳位置,就要调用taskScheduler.submitTasks,创建taskSet对象并提交
if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None)
默认情况下,使用standalone,taskScheduler只是一个trait,实际使用的是taskSchedulerImpl
找到taskSchedulerImpl
并找到submitTasks方法
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //给每个taskset创建一个tasksetmanager,负责taskset的监控和管理 val manager = createTaskSetManager(taskSet, maxTaskFailures) //然后加入缓存 val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } //在sparkcontext源码那里就出现过,创建TaskScheduler时,会为TaskSchedulerImpl创建一个SparkDeploySchedulerBackend,这个backend就是之前创建好的SparkDeploySchedulerBackend,这个backend是负责创建AppClient,向Master注册Application的 backend.reviveOffers() }
进入createTaskSetManager
// Label as private[scheduler] to allow tests to swap in different task set managers if necessary private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures) }
什么是TaskSetManager
在 TaskSchedulerImpl 中的单个 TaskSet 中调度任务。此类跟踪每个任务,如果任务失败则重试任务(最多有限次数),并通过延迟调度处理此 TaskSet 的位置感知调度。它的主要接口是 resourceOffer,它询问 TaskSet 是否要在一个节点上运行任务,以及 statusUpdate,它告诉它其中一个任务改变了状态(例如,已完成)
然后看到submitTasks方法的最后一句, backend.reviveOffers()
进入CoarseGrainedSchedulerBackend,找到reviveOffers方法
override def reviveOffers() { driverEndpoint.send(ReviveOffers) }
可以看到使用driverEndpoint发送了ReviveOffers
进入到driverEndpoint
var driverEndpoint: RpcEndpointRef = null protected def minRegisteredRatio: Double = _minRegisteredRatio override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } // TODO (prashant) send conf instead of properties driverEndpoint = createDriverEndpointRef(properties) }
一直进入createDriverEndpointRef,找到DriverEndpoint
并找到其中的reviveOffers方法
case ReviveOffers => makeOffers()
进入makeOffers
// Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing //先过滤出alive的executor val activeExecutors = executorDataMap.filterKeys(executorIsAlive) //对alive的executor,得到它们上面可用的资源 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq //调用scheduler的resourceOffers,执行任务分配算法,将task分配到executor上 //然后调用launchTasks,将分配的task发送launchTasks消息到对应的executor上,由executor启动和执行task launchTasks(scheduler.resourceOffers(workOffers)) }
先看看resourceOffers,它传入的就是executor的可用资源
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { if (!hostToExecutors.contains(o.host)) { hostToExecutors(o.host) = new HashSet[String]() } if (!executorIdToRunningTaskIds.contains(o.executorId)) { hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. //先shuffle,打乱executor,负载均衡 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. //创建tasks,分配给worker的 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray //从调度池中取出排序好的taskset,这个调度池就是sparkcontext初始化创建的那个调度池 //所有创建好的taskset,都会放入调度池 //执行task分配算法时,就会从池子里取出taskset //所以,分配时是以taskset为单位的 val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } //下面就是核心的任务分配算法了 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY //每个taskset从最高的本地化级别开始遍历 for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { //对每个taskset,尝试使用每一种本地化级别 //将taskset上的task,在executor上启动 //如果无法启动,跳出do-while循环,使用另一种本地化级别 // do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
进入resourceOfferSingleTaskSet方法
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false //遍历所有executor for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host //如果cpu足够提供给每个task if (availableCpus(i) >= CPUS_PER_TASK) { try { //找到,在这个executor上的task,并且根据传入的本地化级别 for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { //放入tasks,给指定的executor要启动的task tasks(i) += task //至此,就实现了task的分配算法 val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) //设置为true launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
回到makeOffers,看看launchTasks
// Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { //序列化为字节数组 val serializedTask = ser.serialize(task) if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { //找到对应的executor val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") //向 executor发送launchTask消息 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)