Spark整理:任务提交源码解析-part2

Spark整理:任务提交源码解析-part2,第1张

Spark整理:任务提交源码解析-part2

Inbox 结构:

private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
  extends Logging {

  inbox =>  // Give this an alias so we can use it more clearly in closures.
	
	//消息体
  @GuardedBy("this")
  protected val messages = new java.util.linkedList[InboxMessage]()

  
  @GuardedBy("this")
  private var stopped = false

  
  @GuardedBy("this")
  private var enableConcurrent = false

  
  @GuardedBy("this")
  private var numActiveThreads = 0

  // onStart should be the first message to process
  inbox.synchronized {
  //onstart 是Executor的收件箱中的消息 ,Onstart消息
    messages.add(OnStart)
  }
 ......
 }

补充说明:在Rpc通信中,每个消息都有生命周期:如下源码注释说讲:
constructor -> onStart -> receive* -> onStop


private[spark] trait RpcEndpoint {......}

Exexutor 的OnStart消息就是 CoarseGrainedExecutorBackend对象的Onstart

 override def onStart(): Unit = {
    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    //获取driver
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      //并向driver发送注册Executor的请求
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

Executor发送消息之后,Driver会接受消息,然而,Driver是一个线程,那么Driver通过环境变量SparkContext接受请求;如下图所示:

Driver处理注册Executor 并响应:

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
        if (executorDataMap.contains(executorId)) {
          context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
        } else if (scheduler.nodeBlacklist.contains(hostname) ||
            isBlacklisted(executorId, hostname)) {
          // If the cluster manager gives us an executor on a blacklisted node (because it
          // already started allocating those resources before we informed it of our blacklist,
          // or if it ignored our blacklist), then we reject that executor immediately.
          logInfo(s"Rejecting $executorId as it has been blacklisted.")
          context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          //配置Executor地址
          addressToExecutorId(executorAddress) = executorId
          //设置总core的消息
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val resourcesInfo = resources.map{ case (k, v) =>
            (v.name,
             new ExecutorResourceInfo(v.name, v.addresses,
               // tell the executor it can schedule resources up to numParts times,
               // as configured by the user, or set to 1 as that is the default (1 task/resource)
               taskResourceNumParts.getOrElse(v.name, 1)))
          }
          val data = new ExecutorData(executorRef, executorAddress, hostname,
            0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
            resourcesInfo, resourceProfileId)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          // Note: some tests expect the reply to come after we put the executor in the map
          //响应注册成功
          context.reply(true)
        }

当Driver发送给Executor注册成功的消息后,Executor接受消息,并创建Executor对象:

private[spark] class CoarseGrainedExecutorBackend(
    override val rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    bindAddress: String,
    hostname: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv,
    resourcesFileOpt: Option[String],
    resourceProfile: ResourceProfile)
  extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
......
override def receive: PartialFunction[Any, Unit] = {
  //接受注册成功的消息,
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
      //构建Executor对象
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
          resources = _resources)
          //并向Driver发送ExecutorId
        driver.get.send(LaunchedExecutor(executorId))
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        taskResources(taskDesc.taskId) = taskDesc.resources
        executor.launchTask(this, taskDesc)
      }
	......
	}

}

到此步:Spark计算环境创建成功:NodeManager ,ResourceManager ,ApplicationManager, Driver,Executor均已创建成功;

最后附上提交流程图:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5689026.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存