Spark源码——Worker原理

Spark源码——Worker原理,第1张

Spark源码——Worker原理

解释:
1、master要求worker启动driver和executor
2、worker启动driver的一个基本的原理,worker会启动一个线程DriverRunner,然后DriverRunner会去负责启动driver进程,然后在之后对driver进程进行管理
3、worker启动executor的一个基本的原理,worker会启动一个线程ExecutorRunner,然后ExecutorRunner会去负责启动executor进程,然后在之后对executor进程进行管理
4、driver首先创建driver的工作目录,封装启动driver的命令,用ProcessBuilder启动Driver
5、executor首先创建executor的工作目录,封装启动executor的命令,用ProcessBuilder启动executor,executor找到对应的driver,去反向注册自己,然后就可以启动executor

master启动并管理driver进程源码分析:
第一步:调用schedule方法的launchDriver方法

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  // 将driver加入到worker内部的缓冲结构中
  // 将worker中使用的内存和cpu的数量,都加上driver需要的内存和cpu的数量
  worker.addDriver(driver)
  // 将worker加入到driver的内存缓冲结构中
  driver.worker = Some(worker)
  // 调用worker的endpoint,给worker发送注册driver的信息
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  driver.state = DriverState.RUNNING
}

第二步:调用worker的launchDriver方法

// 启动driver
case LaunchDriver(driverId, driverDesc) => {
  logInfo(s"Asked to launch driver $driverId")
  val driver = new DriverRunner( //DriverRunner线程
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)
  drivers(driverId) = driver
  driver.start()  //启动Driver进程

	//已使用的cpu和内存
  coresUsed += driverDesc.cores 
  memoryUsed += driverDesc.mem
}

第三步:调用第二步的DriverRunner方法

  private[worker] def start() = {
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // prepare driver jars and run driver
          //在这个类里实现的RunDriver
          val exitCode = prepareAndRunDriver()

          // set final state depending on if forcibly killed and process exit code
          //对prepareAndRunDriver()返回的code做一些逻辑 *** 作
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // notify worker of final driver state, possible exception
        //给worker发送Driver状态改变(可以看Spark源码——状态改变 那篇文章)
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }

进去prepareAndRunDriver

private[worker] def prepareAndRunDriver(): Int = {
    val driverDir = createWorkingDirectory() //创建driver文件目录
    val localJarFilename = downloadUserJar(driverDir)//下载用户jar包到上面创建的目录重

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // TODO: If we add ability to submit multiple jars they should also be added here
    //传入了driver的启动命令、需要的内存大小等信息
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }

进入runDriver看看

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    builder.directory(baseDir)
    def initialize(process: Process): Unit = {
      // Redirect stdout and stderr to files
      //这应该是日志,stdout和stderr是运行的输出日志和错误日志
      val stdout = new File(baseDir, "stdout")
      //重定向到输出流
      CommandUtils.redirectStream(process.getInputStream, stdout)

      val stderr = new File(baseDir, "stderr")
      val formattedCommand = builder.command.asScala.mkString(""", "" "", """)
      val header = "Launch Command: %sn%snn".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
    //运行
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }

进入runCommandWithRetry看看

  private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
    var exitCode = -1
    // Time to wait between submission retries.
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5 //重试次数
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString(""", "" "", """))

      synchronized {
        if (killed) { return exitCode }
        process = Some(command.start())
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()
	//启动进程,process的waitFor就是启动
      exitCode = process.get.waitFor()

      // check if attempting another run
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }

    exitCode //返回的正是exitCode
  }
}

master启动并管理executor进程源码分析:
第一步:调用schedule方法的startExecutorsOnWorkers

 private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
   // 首先遍历waitingApps中ApplicationInfo,并且还需要判断程序中定义的使用cpu的数量-启动执行application上
  // worker上的excutor所使用的的cpu的要大于0
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      // 从workers中,过滤出worker的状态为alive的,按照cpu的数量进行倒序排序
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
         // 在worker上调度executor
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// SpreadOut算法,会将每一个application要启动的executor都平均分布到各个worker上去
   // 比如说20个cpu core,分配到10个worker上,实际会循环两遍worker,每次循环,
   // 给每个worker分配一个 core,最后每个worker分配了2个core
   // 总体概括:平均分布

    // 非SpreadOut算法,将每一个application,尽可能少的分配到worker上去
    // 每个application,都尽可能的分配到尽量少的worker上,比如说10个worker
    // 每个worker10个cpu,application要分配20个core,那么其实,只会分配到2个worker上
    // 每个worker都占满10个core,其余的app,就只能分配到下一个worker上
    // 总体概括:尽可能资源大的分配

   // Now that we've decided how many cores to allocate on each worker, let's allocate them
   // 给每个worker分配完application要求的cpu core之后,遍历worker,只要判断之前给这个worker分配到了core


      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

第二步:调用第一步的allocateWorkerResourceToExecutors方法

 
  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
     // 首先,在application内部缓存结构中,添加executor
    // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配了多少个cpu core
    // 基于我们的机制,实际上,最后,executor的实际数量,以及executor所对应的cpu是不一致的
    // 我们这里是根据总的机制来分配的,比如说要求启动3个executor,每一个executor3个cpu,9个worker,
    // 根据我们的算法来说的话,就是每一个worker启动一个executor,一个executor对应一个cpu core
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

第三步:调用第二步的launchExecutor方法

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  // 将executor加入worker的内存缓存
  worker.addExecutor(exec)
  // 向worker的endpoint发生LaunchExecutor消息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  // 向executor对应的application的driver,发生executorAdded消息
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

第四步:调用worker类的LaunchExecutor这个case class

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor's working directory
          //创建executor目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs

		//创建ExecutorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)	

			//加入本地缓存
          executors(appId + "/" + execId) = manager
          //启动ExecutorRunner
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
		//发送给master,Executor状态改变
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

第五步:调用第四步的ExecutorRunner
看到start方法

private[worker] def start() {

    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

进入fetchAndRunExecutor()

 
  private def fetchAndRunExecutor() {
    try {
      // Launch the process
      //封装一个processBuilder
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString(""", "" "", """)
      logInfo(s"Launch command: $formattedCommand")
	//创建目录
      builder.directory(executorDir)
      //放一些环境变量
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      process = builder.start()
      val header = "Spark Executor Command: %sn%snn".format(
        formattedCommand, "=" * 40)

      // Redirect its stdout and stderr to files
      //重定向到创建的目录中
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code

		//启动进程,waitFor
      val exitCode = process.waitFor()
	//拿到一个返回状态
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      //给worker发送executor状态改变信息
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
    }
  }
}

tip:在这种涉及通信的过程,Master和Worker通常会有相同类名的类,涉及不同的处理逻辑,可以对照着看。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5665218.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存