Inbox 结构:
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint) extends Logging { inbox => // Give this an alias so we can use it more clearly in closures. //消息体 @GuardedBy("this") protected val messages = new java.util.linkedList[InboxMessage]() @GuardedBy("this") private var stopped = false @GuardedBy("this") private var enableConcurrent = false @GuardedBy("this") private var numActiveThreads = 0 // onStart should be the first message to process inbox.synchronized { //onstart 是Executor的收件箱中的消息 ,Onstart消息 messages.add(OnStart) } ...... }
补充说明:在Rpc通信中,每个消息都有生命周期:如下源码注释说讲:
constructor -> onStart -> receive* -> onStop
private[spark] trait RpcEndpoint {......}
Exexutor 的OnStart消息就是 CoarseGrainedExecutorBackend对象的Onstart
override def onStart(): Unit = { logInfo("Connecting to driver: " + driverUrl) try { _resources = parseOrFindResources(resourcesFileOpt) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } //获取driver rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) //并向driver发送注册Executor的请求 ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, extractAttributes, _resources, resourceProfile.id)) }(ThreadUtils.sameThread).onComplete { case Success(_) => self.send(RegisteredExecutor) case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) }
Executor发送消息之后,Driver会接受消息,然而,Driver是一个线程,那么Driver通过环境变量SparkContext接受请求;如下图所示:
Driver处理注册Executor 并响应:
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes, resources, resourceProfileId) => if (executorDataMap.contains(executorId)) { context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId")) } else if (scheduler.nodeBlacklist.contains(hostname) || isBlacklisted(executorId, hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress } logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") //配置Executor地址 addressToExecutorId(executorAddress) = executorId //设置总core的消息 totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map{ case (k, v) => (v.name, new ExecutorResourceInfo(v.name, v.addresses, // tell the executor it can schedule resources up to numParts times, // as configured by the user, or set to 1 as that is the default (1 task/resource) taskResourceNumParts.getOrElse(v.name, 1))) } val data = new ExecutorData(executorRef, executorAddress, hostname, 0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes, resourcesInfo, resourceProfileId) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) // Note: some tests expect the reply to come after we put the executor in the map //响应注册成功 context.reply(true) }
当Driver发送给Executor注册成功的消息后,Executor接受消息,并创建Executor对象:
private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, bindAddress: String, hostname: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) extends IsolatedRpcEndpoint with ExecutorBackend with Logging { ...... override def receive: PartialFunction[Any, Unit] = { //接受注册成功的消息, case RegisteredExecutor => logInfo("Successfully registered with driver") try { //构建Executor对象 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, resources = _resources) //并向Driver发送ExecutorId driver.get.send(LaunchedExecutor(executorId)) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) taskResources(taskDesc.taskId) = taskDesc.resources executor.launchTask(this, taskDesc) } ...... } }
到此步:Spark计算环境创建成功:NodeManager ,ResourceManager ,ApplicationManager, Driver,Executor均已创建成功;
最后附上提交流程图:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)