Spark RDD Partition和Scheduler调度的梳理

Spark RDD Partition和Scheduler调度的梳理,第1张

Spark RDD Partition和Scheduler调度的梳理 问题背景

业务同学报障,同一个Spark计算,数据源,执行代码和提交客户端配置都一模一样,第一次运行跑了几个小时没出数kill掉了,失败后第二次运行,跑了18分钟就出数了。我这边要分析一下原因,提供解决方案,避免再出现类似的问题。

说明:该记录只是问题梳理,不会涉及任何业务信息。

问题分析

对比了Spark History的详情,以及日志,发现同样的执行计划,失败的任务的Stage 的并发数是10个,成功的并发数是500个。

查看driver日志,可以知道,在跑Job 0之前,只收到了4个executor,一个executor用于运行driver,剩下三个分别是4核,4核,2核,所以一共可用的并发资源就10个,因此在Job 0的Stage 0环节开始,创建了10个task运行。查看当时的队列资源,发现当时pending:avaliable的比例是8:1,就是可用资源为1,等待资源需要8,因此资源是十分紧张的。

21/12/30 04:10:07 INFO Utils: Using initial executors = 150, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances 
...
21/12/30 04:10:07 INFO YarnAllocator: Will request 150 executor container(s), each with 8 core(s) and 11264 MB memory (including 1024 MB of overhead) //按客户端的配置,需要150个executor,每个8核,10G内存
21/12/30 04:10:07 INFO YarnAllocator: Submitted 150 unlocalized container requests.
21/12/30 04:10:07 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
21/12/30 04:10:34 INFO YarnAllocator: Launching container container_e188_1639619541647_741615_01_000007 on host fs-hiido-dn-12-9-224.hiido.host.yydevops.com for executor with ID 1
21/12/30 04:10:34 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. //目前收到1个yarn分配的container,并装配
...
21/12/30 04:10:46 INFO YarnAllocator: Launching container container_e188_1639619541647_741615_01_000152 on host fs-hiido-dn-12-8-92.hiido.host.yydevops.com for executor with ID 2
21/12/30 04:10:46 INFO YarnAllocator: Launching container container_e188_1639619541647_741615_01_000153 on host fs-hiido-dn-12-64-165.hiido.host.yydevops.com for executor with ID 3
21/12/30 04:10:46 INFO YarnAllocator: Launching container container_e188_1639619541647_741615_01_000154 on host fs-hiido-dn-12-15-14.hiido.host.yydevops.com for executor with ID 4
21/12/30 04:10:46 INFO YarnAllocator: Received 3 containers from YARN, launching executors on 3 of them. //目前收到3个yarn分配的container,并装配
... //然后一直没收到yarn分配的container了
21/12/30 04:10:50 INFO DAGScheduler: Got map stage job 0 (run at ThreadPoolExecutor.java:1149) with 10 output partitions //RDD分区数为10,因此开了10个task并发跑
...
21/12/30 04:10:50 INFO DAGScheduler: Final stage: ShuffleMapStage 0 (run at ThreadPoolExecutor.java:1149)
21/12/30 04:10:50 INFO DAGScheduler: Parents of final stage: List()
21/12/30 04:10:50 INFO DAGScheduler: Missing parents: List()
21/12/30 04:10:50 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[8] at run at ThreadPoolExecutor.java:1149), which has no missing parents
...
21/12/30 04:10:50 INFO DAGScheduler: Submitting 10 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[8] at run at ThreadPoolExecutor.java:1149) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
21/12/30 04:10:50 INFO YarnClusterScheduler: Adding task set 0.0 with 10 tasks

最后通过设置spark.default.parallelism去解决问题,job只能等500个并发资源都到齐了,才会开始运行。至于原因,在下面源码梳理中。

set spark.default.parallelism=500
源码梳理 概念

首先,从Spark History去看,每个Query是一句查询,有些Query不会生成Job(set等语句),有些生成了,Job只有在需要执行一个RDD的Action的时候生成。

每个Job会拆分成一个或多个Stage,实现源码是org.apache.spark.scheduler.DAGScheduler,从后往前,如果前依赖(父依赖)is instance of ShuffleDependency,即宽依赖,那么就会进行Stage的划分。每个Stage的分区数根据finalRDD决定。

Task是每个Stage的执行单元,根据Stage的分区数决定一个Stage由多少并发去执行,即多少个Task。

对于Spark On Yarn模式,每个Task都运行在Yarn分配的Executor上,也就是一个JVM进程。一个Executor可以运行多个同Stage Task,每个Task占用一个线程资源。

org.apache.spark.scheduler.DAGScheduler

//发现一个ShuffleDependency,就调用一次,进行Stage切分
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
    val numTasks = rdd.partitions.length //针对这个问题,我们主要看这个东东的实现
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

org.apache.spark.ShuffleDependency,这里的RDD是一个抽象类。

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

根据Key和Value的实现,有不同的实现。根据上面的日志,可以知道,该RDD的实现类是MapPartitionsRDD。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions
...
}

org.apache.spark.scheduler.TaskSchedulerImpl

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
 }


override def defaultParallelism(): Int = backend.defaultParallelism() //并发度

org.apache.spark.scheduler.CoarseGrainedSchedulerBackend

//org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
override def defaultParallelism(): Int = {
   //取数规则,如果spark.default.parallelism没配置,就由totalCoreCount确定
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}  
 
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
          ...
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")  //这里有日志
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores) //totalCoreCount在这里累计可用的核数
          totalRegisteredExecutors.addAndGet(1)
          ...
        }
    ...
}

由代码调用过程可以看到:Task被执行的并发度 = Executor数目 * 每个Executor核数(=core总个数)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5694484.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存