解释:
1、master要求worker启动driver和executor
2、worker启动driver的一个基本的原理,worker会启动一个线程DriverRunner,然后DriverRunner会去负责启动driver进程,然后在之后对driver进程进行管理
3、worker启动executor的一个基本的原理,worker会启动一个线程ExecutorRunner,然后ExecutorRunner会去负责启动executor进程,然后在之后对executor进程进行管理
4、driver首先创建driver的工作目录,封装启动driver的命令,用ProcessBuilder启动Driver
5、executor首先创建executor的工作目录,封装启动executor的命令,用ProcessBuilder启动executor,executor找到对应的driver,去反向注册自己,然后就可以启动executor
master启动并管理driver进程源码分析:
第一步:调用schedule方法的launchDriver方法
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) // 将driver加入到worker内部的缓冲结构中 // 将worker中使用的内存和cpu的数量,都加上driver需要的内存和cpu的数量 worker.addDriver(driver) // 将worker加入到driver的内存缓冲结构中 driver.worker = Some(worker) // 调用worker的endpoint,给worker发送注册driver的信息 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
第二步:调用worker的launchDriver方法
// 启动driver case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( //DriverRunner线程 conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start() //启动Driver进程 //已使用的cpu和内存 coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
第三步:调用第二步的DriverRunner方法
private[worker] def start() = { new Thread("DriverRunner for " + driverId) { override def run() { var shutdownHook: AnyRef = null try { shutdownHook = ShutdownHookManager.addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId") kill() } // prepare driver jars and run driver //在这个类里实现的RunDriver val exitCode = prepareAndRunDriver() // set final state depending on if forcibly killed and process exit code //对prepareAndRunDriver()返回的code做一些逻辑 *** 作 finalState = if (exitCode == 0) { Some(DriverState.FINISHED) } else if (killed) { Some(DriverState.KILLED) } else { Some(DriverState.FAILED) } } catch { case e: Exception => kill() finalState = Some(DriverState.ERROR) finalException = Some(e) } finally { if (shutdownHook != null) { ShutdownHookManager.removeShutdownHook(shutdownHook) } } // notify worker of final driver state, possible exception //给worker发送Driver状态改变(可以看Spark源码——状态改变 那篇文章) worker.send(DriverStateChanged(driverId, finalState.get, finalException)) } }.start() }
进去prepareAndRunDriver
private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() //创建driver文件目录 val localJarFilename = downloadUserJar(driverDir)//下载用户jar包到上面创建的目录重 def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here //传入了driver的启动命令、需要的内存大小等信息 val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) }
进入runDriver看看
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = { builder.directory(baseDir) def initialize(process: Process): Unit = { // Redirect stdout and stderr to files //这应该是日志,stdout和stderr是运行的输出日志和错误日志 val stdout = new File(baseDir, "stdout") //重定向到输出流 CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(baseDir, "stderr") val formattedCommand = builder.command.asScala.mkString(""", "" "", """) val header = "Launch Command: %sn%snn".format(formattedCommand, "=" * 40) Files.append(header, stderr, StandardCharsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } //运行 runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) }
进入runCommandWithRetry看看
private[worker] def runCommandWithRetry( command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = { var exitCode = -1 // Time to wait between submission retries. var waitSeconds = 1 // A run of this many seconds resets the exponential back-off. val successfulRunDuration = 5 //重试次数 var keepTrying = !killed while (keepTrying) { logInfo("Launch Command: " + command.command.mkString(""", "" "", """)) synchronized { if (killed) { return exitCode } process = Some(command.start()) initialize(process.get) } val processStart = clock.getTimeMillis() //启动进程,process的waitFor就是启动 exitCode = process.get.waitFor() // check if attempting another run keepTrying = supervise && exitCode != 0 && !killed if (keepTrying) { if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) { waitSeconds = 1 } logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") sleeper.sleep(waitSeconds) waitSeconds = waitSeconds * 2 // exponential back-off } } exitCode //返回的正是exitCode } }
master启动并管理executor进程源码分析:
第一步:调用schedule方法的startExecutorsOnWorkers
private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. // 首先遍历waitingApps中ApplicationInfo,并且还需要判断程序中定义的使用cpu的数量-启动执行application上 // worker上的excutor所使用的的cpu的要大于0 for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // Filter out workers that don't have enough resources to launch an executor // 从workers中,过滤出worker的状态为alive的,按照cpu的数量进行倒序排序 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse // 在worker上调度executor val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // SpreadOut算法,会将每一个application要启动的executor都平均分布到各个worker上去 // 比如说20个cpu core,分配到10个worker上,实际会循环两遍worker,每次循环, // 给每个worker分配一个 core,最后每个worker分配了2个core // 总体概括:平均分布 // 非SpreadOut算法,将每一个application,尽可能少的分配到worker上去 // 每个application,都尽可能的分配到尽量少的worker上,比如说10个worker // 每个worker10个cpu,application要分配20个core,那么其实,只会分配到2个worker上 // 每个worker都占满10个core,其余的app,就只能分配到下一个worker上 // 总体概括:尽可能资源大的分配 // Now that we've decided how many cores to allocate on each worker, let's allocate them // 给每个worker分配完application要求的cpu core之后,遍历worker,只要判断之前给这个worker分配到了core // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
第二步:调用第一步的allocateWorkerResourceToExecutors方法
private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // If the number of cores per executor is specified, we divide the cores assigned // to this worker evenly among the executors with no remainder. // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. // 首先,在application内部缓存结构中,添加executor // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配了多少个cpu core // 基于我们的机制,实际上,最后,executor的实际数量,以及executor所对应的cpu是不一致的 // 我们这里是根据总的机制来分配的,比如说要求启动3个executor,每一个executor3个cpu,9个worker, // 根据我们的算法来说的话,就是每一个worker启动一个executor,一个executor对应一个cpu core val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } }
第三步:调用第二步的launchExecutor方法
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) // 将executor加入worker的内存缓存 worker.addExecutor(exec) // 向worker的endpoint发生LaunchExecutor消息 worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) // 向executor对应的application的driver,发生executorAdded消息 exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
第四步:调用worker类的LaunchExecutor这个case class
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory //创建executor目录 val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq) appDirectories(appId) = appLocalDirs //创建ExecutorRunner val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) //加入本地缓存 executors(appId + "/" + execId) = manager //启动ExecutorRunner manager.start() coresUsed += cores_ memoryUsed += memory_ //发送给master,Executor状态改变 sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } }
第五步:调用第四步的ExecutorRunner
看到start方法
private[worker] def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } } workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = ShutdownHookManager.addShutdownHook { () => // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`. if (state == ExecutorState.RUNNING) { state = ExecutorState.FAILED } killProcess(Some("Worker shutting down")) } }
进入fetchAndRunExecutor()
private def fetchAndRunExecutor() { try { // Launch the process //封装一个processBuilder val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() val formattedCommand = command.asScala.mkString(""", "" "", """) logInfo(s"Launch command: $formattedCommand") //创建目录 builder.directory(executorDir) //放一些环境变量 builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls val baseUrl = if (conf.getBoolean("spark.ui.reverseProxy", false)) { s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType=" } else { s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" } builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") process = builder.start() val header = "Spark Executor Command: %sn%snn".format( formattedCommand, "=" * 40) // Redirect its stdout and stderr to files //重定向到创建的目录中 val stdout = new File(executorDir, "stdout") stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, StandardCharsets.UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code //启动进程,waitFor val exitCode = process.waitFor() //拿到一个返回状态 state = ExecutorState.EXITED val message = "Command exited with code " + exitCode //给worker发送executor状态改变信息 worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) } catch { case interrupted: InterruptedException => logInfo("Runner thread for executor " + fullId + " interrupted") state = ExecutorState.KILLED killProcess(None) case e: Exception => logError("Error running executor", e) state = ExecutorState.FAILED killProcess(Some(e.toString)) } } }
tip:在这种涉及通信的过程,Master和Worker通常会有相同类名的类,涉及不同的处理逻辑,可以对照着看。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)