【Spark程序执行1】SparkContext对象构建以及RDD依赖解析
【Spark程序执行2】阶段划分(dagScheduler)
Spark任务阶段划分主要是DagScheduler控制,那么底层源码是如何呢?
我们以rdd.collect()方法入手分析:
1,runJob()方法最后会调用dagScheduler.runJob()方法;
2,DagScheduler的runJob方法中,会有submitJob()方法
3,submitJob方法会将JobSubmitted放入到事件队列中,
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } //获取任务id val jobId = nextJobId.getAndIncrement() if (partitions.isEmpty) { val clonedProperties = Utils.cloneProperties(properties) if (sc.getLocalProperty(SparkContext.SPARK_JOB_DEscriptION) == null) { clonedProperties.setProperty(SparkContext.SPARK_JOB_DEscriptION, callSite.shortForm) } val time = clock.getTimeMillis() //启动任务监听 listenerBus.post( SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties)) listenerBus.post( SparkListenerJobEnd(jobId, time, JobSucceeded)) // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.nonEmpty) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler) //将任务提交放入到事件队列中 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, Utils.cloneProperties(properties))) waiter }
4,提交任务事件会放入事件队列,并且事件线程,会获取队列中的事件,并调用onReceive方法。该方法时抽象方法,会调用它的子类执行。寻找其子类
private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new linkedBlockingDeque[E]() def post(event: E): Unit = { if (!stopped.get) { //放入队列之前,判断事件线程是否活跃,事件线程,会从队列中获取事件执行 if (eventThread.isAlive) { eventQueue.put(event) } else { onError(new IllegalStateException(s"$name has already been stopped accidentally.")) } } } //事件线程 // Exposed for testing. private[spark] val eventThread = new Thread(name) { setDaemon(true) //在run方法中从队列中获取事件 override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } }
5,EventLoop的子类 DAGSchedulerEventProcessLoop中会有onReceive()方法;
如下图所示:
6,onReceive方法会调用doOnReceive方法,并通过模式匹配,处理不同的事件,其中第一个事件就是上面从队列中取出的JobSubmitted()事件,并且调用dagScheduler对象处理事件方法处理该事件;
//模式匹配处理不同的事件;任务提交事件,任务取消事件,Stage取消事件等; private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { //提交任务事件 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => //匹配上之后,就会调用dagScheduler 处理事件方法处理相应事件; dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => val workerLost = reason match { case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, workerLost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
7,dagScheduler.handleJobSubmitted()方法里面会有阶段的划分;
源码如下:
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit = { //我们最终的stage是一个ResultStage var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. //创建结果Stage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: BarrierJobSlotsNumberCheckFailed => // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, (_: Int, value: Int) => value + 1) logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " + s"but only ${e.maxConcurrentTasks} are available. " + s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times") if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SEConDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return } case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
8,createResultStage方法
private def createResultStage( rdd: RDD[_], //需要处理的RDD func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() //构建ResultStage对象 val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
获取或创建父Stage
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { //获取Rdd的ShuffleDependencies;并根据RDD的宽依赖获取或创建ShuffleMapStage; getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
获取或创建ShuffleMapStage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { //先从shuffleIdToMapStage中获取,如果获取到则返回该Srage,如果没有获取到则创建新的Stage shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
9,createShuffleMapStage 创建ShuffleMapStage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage] def createShuffleMapStage[K, V, C]( shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length //获取或者创建父Stage,如果存在Shuffle依赖,则又会创建Stage val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() //创建ShuffleMapStage val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) //并将Stage信息写入映射Map中 stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " + s"shuffle ${shuffleDep.shuffleId}") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
到此,我们的阶段划分已经OK,由上我们可知,在Spark中Stage分为俩类,ResultStage和ShuffleMapStage,每个Spark程序,最终都会有一个ResultStage,然后根据最后RDD的依赖关系从前寻根,遇到RDD的Shuffle依赖,则创建一个新的ShuffleMapStage,直到最开始的RDD没有Shuffle依赖。所以,Spark程序中,Stage的个数等于Shuffle依赖的个数+1;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)