2021SC@SDUSC
目录
创建和启动DAGScheduler
创建和启动DAGScheduler
DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAGScheduler的代码如下:
@volatile private[spark] var dagScheduler: DAGScheduler = _ dagScheduler = new DAGScheduler(this)
DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,代码如下:
private[scheduler] val nextJobId = new AtomicInteger(O) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(O) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] private val failedEpoch = new HashMap[String, Long] private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
在构造DAGScheduler的时候会调用 initializeEventProcessActor 方法创建 DAGSchedulerEventProcessActor,代码如下:
private[scheduler] var eventProcessActor: ActorRef = _ private def initializeEventProcessActor () { // blocking the thread until supervisor is started, which ensures eventProcessActor //is // not null before any job is submitted implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asinstanceOf[ActorRef] } initializeEventProcessActor()
这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。见下面第一段代码,DAGSchedulerActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAGSchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册ActorSystem, ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了
dagScheduler,见下面第二段代码。从第二段代码我们还看到DAGSchedulerEventProcessActor 所能处理的悄息类型, 比如JobSubmitted、 BeginEvent、CompletionEvent等。DAGSchedulerEventProcessActor接受这些消息后会有不同的处理动作。
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) extends Actor with Logging{ override val supervisorStrategy = oneForOneStrategy() { case x: Exception => logError("eventProcesserActor failed; shutting down SparkContext", x) try { dagScheduler.doCancelAllJobs() } catch { case t: Throwable => logError ("DAGScheduler failed to cancel all jobs.", t) } dagScheduler.sc.stop() Stop } def receive = { case p: Props => sender ! context.actorOf (p) case_ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } }
private[scheduler] class DAGSchedulerEventProcessActor (dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { dagScheduler.taskScheduler.seLDAGScheduler(dagScheduler) } / ** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobld, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties) case StageCancelled(stageid) => dagScheduler.handleS七ageCancellation(stageId) case JobCancelled(jobid) => dagScheduler.handleJobCancellation(jobld) case JobGroupCancelled(groupid) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execid, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execld, fetchFailed = false) case BeginEvent(task, tasklnfo) => dagScheduler.handleBeginEvent(task, taskinfo) case GettingResultEvent(tasklnfo) => dagScheduler.handleGetTaskResult(taskinfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { //Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)