山东大学软件工程应用与实践: Spark(九) 代码分析

山东大学软件工程应用与实践: Spark(九) 代码分析,第1张

山东大学软件工程应用与实践: Spark(九) 代码分析

2021SC@SDUSC


目录

创建和启动DAGScheduler


创建和启动DAGScheduler

DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAGScheduler的代码如下:

@volatile private[spark] var dagScheduler: DAGScheduler = _
    dagScheduler = new DAGScheduler(this)

DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,代码如下:

private[scheduler] val nextJobId = new AtomicInteger(O) 
private[scheduler] def numTotalJobs: Int = nextJobId.get() 
private val nextStageId = new AtomicInteger(O)

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] 
private[scheduler] val stageIdToStage = new HashMap[Int, Stage] 
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage] 
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] 

// Stages we need to run whose parents aren't done 
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now 
private[scheduler] val runningStages = new HashSet[Stage] 
// Stages that must be resubmitted due to fetch failures 
private[scheduler] val failedStages = new HashSet[Stage] 
private[scheduler] val activeJobs = new HashSet[ActiveJob] 
// Contains the locations that each RDD's partitions are cached on 
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] 
private val failedEpoch = new HashMap[String, Long] 

private val dagSchedulerActorSupervisor = 
    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) 

private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

在构造DAGScheduler的时候会调用 initializeEventProcessActor 方法创建 DAGSchedulerEventProcessActor,代码如下:

    private[scheduler] var eventProcessActor: ActorRef = _
private def initializeEventProcessActor () { 

        // blocking the thread until supervisor is started, which ensures eventProcessActor         
          //is 
        // not null before any job is submitted
        implicit val timeout = Timeout(30 seconds) 
        val initEventActorReply = 
            dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))                 
        eventProcessActor = Await.result(initEventActorReply, timeout.duration).
            asinstanceOf[ActorRef] 

}

initializeEventProcessActor()

这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。见下面第一段代码,DAGSchedulerActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAGSchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册ActorSystem, ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了
dagScheduler,见下面第二段代码。从第二段代码我们还看到DAGSchedulerEventProcessActor 所能处理的悄息类型, 比如JobSubmitted、 BeginEvent、CompletionEvent等。DAGScheduler­EventProcessActor接受这些消息后会有不同的处理动作。

private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
    extends Actor with Logging{

    override val supervisorStrategy = 
        oneForOneStrategy() {
            case x: Exception =>
                logError("eventProcesserActor failed; shutting down SparkContext", x) 
                try {
                    dagScheduler.doCancelAllJobs() 
            } catch {
                    case t: Throwable => logError ("DAGScheduler failed to cancel all             
                    jobs.", t)
            }

            dagScheduler.sc.stop() 
            Stop 
    }

def receive = {
        case p: Props => sender ! context.actorOf (p) 
        case_ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
    }

}
private[scheduler] class DAGSchedulerEventProcessActor (dagScheduler: DAGS­cheduler)
    extends Actor with Logging {
    override def preStart() {
        dagScheduler.taskScheduler.seLDAGScheduler(dagScheduler) 
    }
/ **
* The main event loop of the DAG scheduler. 
*/
    def receive = { 
        case JobSubmitted(jobld, rdd, func, partitions, allowLocal, callSite,        
            listener, properties) =>
            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal,         
            callSite,listener, properties) 
        case StageCancelled(stageid) => 
            dagScheduler.handleS七ageCancellation(stageId)
        case JobCancelled(jobid) => 
            dagScheduler.handleJobCancellation(jobld) 
        case JobGroupCancelled(groupid) => 
            dagScheduler.handleJobGroupCancelled(groupId) 
        case AllJobsCancelled => 
            dagScheduler.doCancelAllJobs() 
        case ExecutorAdded(execId, host) => 
            dagScheduler.handleExecutorAdded(execid, host) 
        case ExecutorLost(execId) => 
            dagScheduler.handleExecutorLost(execld, fetchFailed = false) 
        case BeginEvent(task, tasklnfo) => 
            dagScheduler.handleBeginEvent(task, taskinfo) 
        case GettingResultEvent(tasklnfo) => 
            dagScheduler.handleGetTaskResult(taskinfo)
        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
            dagScheduler.handleTaskCompletion(completion) 
        case TaskSetFailed(taskSet, reason) => 
            dagScheduler.handleTaskSetFailed(taskSet, reason) 
        case ResubmitFailedStages => 
            dagScheduler.resubmitFailedStages()
    }
    override def postStop() {
        //Cancel any active jobs in postStop hook             
        dagScheduler.cleanUpAfterSchedulerStop() 
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665172.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存