经过上一节我们分析到,Spark通过yarn client项RM提交了一个ApplicationMaster并启动,我们接下来看看其细节。
def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) master = new ApplicationMaster(amArgs) System.exit(master.run()) } final def run(): Int = { doAsUser { runImpl() } exitCode } private def runImpl(): Unit = { try { val appAttemptId = client.getAttemptId() var attemptID: Option[String] = None if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") // Set the master and deploy mode property to match the requested mode. System.setProperty("spark.master", "yarn") System.setProperty("spark.submit.deployMode", "cluster") // Set this internal configuration if it is running on cluster mode, this // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) attemptID = Option(appAttemptId.getAttemptId.toString) } new CallerContext( "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() logInfo("ApplicationAttemptId: " + appAttemptId) // This shutdown hook should run *after* the SparkContext is shut down. val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { // The default state of ApplicationMaster is failed if it is invoked by shut down hook. // This behavior is different compared to 1.x version. // If user application is exited ahead of time by calling System.exit(N), here mark // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call // System.exit(0) to terminate the application. finish(finalStatus, ApplicationMaster.EXIT_EARLY, "Shutdown hook called before final status was reported.") } if (!unregistered) { // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) cleanupStagingDir() } } } if (isClusterMode) { runDriver() } else { runExecutorLauncher() } } catch {... } finally { try { metricsSystem.foreach { ms => ms.report() ms.stop() } } catch {...} } }
我们这里采用的是cluster模式,所以最后运行runDriver:
private def runDriver(): Unit = { addAmIpFilter(None) userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
runDriver中第一个步就是startUserApplication,这里就是启动了我们自己编写的类,在spark启动时指定的class,这里是单独启动一个线程来启动的。
然后就行向RM申请资源createAllocator,这里是通过YarnAllocator来具体实现:
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { val appId = client.getAttemptId().getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString allocator = client.createAllocator( yarnConf, _sparkConf, driverUrl, driverRef, securityMgr, localResources) credentialRenewer.foreach(_.setDriverRef(driverRef)) rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) ms.registerSource(new ApplicationMasterSource(prefix, allocator)) // do not register static sources in this case as per SPARK-25277 ms.start(false) metricsSystem = Some(ms) reporterThread = launchReporterThread() }
这里面会向NM申请资源:
def allocateResources(): Unit = synchronized { updateResourceRequests() val progressIndicator = 0.1f val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes) if (allocatedContainers.size > 0) { handleAllocatedContainers(allocatedContainers.asScala) } val completedContainers = allocateResponse.getCompletedContainersStatuses() }
首先相当于是向RM发送了一个心跳,看当前RM是否有可用的Container,这里并没有进行实际的Container申请,然后根据我们传入的配置,设置需要申请的executor需要申请的内存和CPU的容量,然后通过handleAllocatedContainers过滤不符合要求的Container:
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { val containersToUse = new ArrayBuffer[Container](allocatedContainers.size) val remainingAfterHostMatches = new ArrayBuffer[Container] for (allocatedContainer <- allocatedContainers) { matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost, containersToUse, remainingAfterHostMatches) } val remainingAfterRackMatches = new ArrayBuffer[Container] if (remainingAfterHostMatches.nonEmpty) { var exception: Option[Throwable] = None val thread = new Thread("spark-rack-resolver") { override def run(): Unit = { try { for (allocatedContainer <- remainingAfterHostMatches) { val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost) matchContainerToRequest(allocatedContainer, rack, containersToUse, remainingAfterRackMatches) } } catch { case e: Throwable => exception = Some(e) } } } thread.setDaemon(true) thread.start() try { thread.join() } catch { } } val remainingAfterOffRackMatches = new ArrayBuffer[Container] for (allocatedContainer <- remainingAfterRackMatches) { matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse, remainingAfterOffRackMatches) } if (!remainingAfterOffRackMatches.isEmpty) { logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " + s"allocated to us") for (container <- remainingAfterOffRackMatches) { internalReleaseContainer(container) } } runAllocatedContainers(containersToUse) logInfo("Received %d containers from YARN, launching executors on %d of them." .format(allocatedContainers.size, containersToUse.size)) }
接下来在runAllocatedContainers对符合要求的Container申请资源:
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString def updateInternalState(): Unit = synchronized { runningExecutors.add(executorId) numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) } if (runningExecutors.size() < targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() updateInternalState() } catch { case e: Throwable => numExecutorsStarting.decrementAndGet() if (NonFatal(e)) { amClient.releaseAssignedContainer(containerId) } else { throw e } } } }) } else { } } } }
通过线程池来不断申请Container,并启动Container,这些 *** 作在ExecutorRunnable完成:
def run(): Unit = { logDebug("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(conf) nmClient.start() startContainer() } def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand() ctx.setCommands(commands.asJava) ctx.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava) if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { JavaUtils.stringToBytes(secretString) } else { ByteBuffer.allocate(0) } ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) } try { nmClient.startContainer(container.get, ctx) } catch { } }
在startContainer中,我们可以发现,其启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend,详细的我们下一节在讲吧。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)