【Spark程序执行1】SparkContext对象构建以及RDD依赖解析
【Spark程序执行2】阶段划分(dagScheduler)
【Spark程序执行3】任务划分
承接【Spark程序执行2】阶段划分,在划分好阶段之后,会提交stage:
如下:
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit = { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { //异常处理逻辑 } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //其他无用处理逻辑 val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //提交stage submitStage(finalStage) }
理解任务划分原理,继续跟踪submitStage代码:
private def submitStage(stage: Stage): Unit = { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug(s"submitStage($stage (name=${stage.name};" + s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //判断是否有父stage,如果有则提交父stage,如果没有,则提交当前stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //一直追溯到没有父阶段的阶段,说明这里是第一个阶段 submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { //向前追溯时,如果当前stage还有存在父stage的stage,继续向前追溯, //一直到追溯到没有父stage的stage,即missing.isEmpty submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
该方法中从finalStage(ResultStage对象实例)开始回溯,直到没有Parents Stage 为止,提交整个job的所有stage,在某一个Stage,DAGScheduler会调用submitMissingTasks方法把Task提交给TaskScheduler进行细粒度的Task调度。所以我们继续关注DAGScheduler的submitMissingTasks方法。
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") ...... // Figure out the indexes of partition ids to compute. // 首先找出要计算的分区ID的索引,要特别关注这个参数,这个参数决定了一个stage划分多少个task val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. //不同类型stage的启动器不同 //模式匹配,根据不同的类型,构建不同的启动器 stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) //ResultStage:意味着,该stage为最后的stage case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } //获取task的数据本地性信息 //task的本地属性,在后续章节中会讲解 //task的本地属性:确定数据在哪个节点上,就到哪个节点上的Executor上去运行; val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } ...... //重点:tasks //这里是生成task的关键点,task类型根据stage类型分为以下两种 //创建ShuffleMapTask val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() //注意这里的partitionsToCompute,这个是finalRDD的分区数索引, //这里遍历这个分区数索引来创建task, //说明每个stage中task的数量是由stage最后一个rdd的分区数来确定的。 partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id //创建task对象,有几个分区,就会有几个task new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } // partitionsToCompute属性,是stage中计算partition个数的返回值 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() //ShuffleMapStage类中的获取partition个数实现逻辑 override def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } //ResultTasks的创建过程与ShuffleStage的task创建过程相似 case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) //创建ResultTask对象 new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } //如果tasks不为空,则taskScheduler提交task if (tasks.nonEmpty) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => logDebug(s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})") markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) } }
这个方法主要做了三件事:
第一,获取task数据本地性;具体逻辑见下面Task的本地性源码分析:
第二,每个stage中生成task列表,具体逻辑间注释;
第三,向TaskScheduler提交生成的task列表;
Task的本地性源码分析:
private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 //先判断,rdd的partition是否已经被访问过 if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } // If the partition is cached, return the cache locations // 优先获取缓存中的RDD分区地址 val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those // 然后如果缓存中没有RDD分区地址,则判断RDD本身是否有首选地址,若有,就用来构造TaskLocation val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) } // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. //遍历rdd的依赖,如果RDD中有窄依赖,就挑选第一个窄依赖的第一个分区的首选地址 rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } case _ => } Nil }
附上:ShuffleMapTask源码:
private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation], localProperties: Properties, serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None, isBarrier: Boolean = false) extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) with Logging { def this(partitionId: Int) { this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null) } @transient private val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.distinct } //执行Task方法,会根据Task的本地行,在对应的Executor上执行 override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTimeNs = System.nanoTime() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L val rdd = rddAndDep._1 val dep = rddAndDep._2 // While we use the old shuffle fetch protocol, we use partitionId as mapId in the // ShuffleBlockId construction. val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { partitionId } else context.taskAttemptId() dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition) } override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) }
ResultTask源码:
private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None, isBarrier: Boolean = false) extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTimeNs = System.nanoTime() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L func(context, rdd.iterator(partition, context)) } // This is only callable on the driver side. override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)