有多种方法可以从Spark获取执行程序的数量和集群中的核心数量。这是我过去使用的一些Scala实用程序代码。您应该可以轻松地使其适应Java。有两个关键思想:
工人人数是执行者人数减去一或
sc.getExecutorStorageStatus.length - 1
。每个工人的核心数可以通过
java.lang.Runtime.getRuntime.availableProcessors
在一个工人上执行来获得。
其余代码是为
SparkContext使用Scala隐式添加便利方法的样板。我在1.x年前编写了代码,这就是为什么不使用的原因
SparkSession。
最后一点:合并多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用1.5倍至4倍之间的任何值,具体取决于数据大小以及作业是否在共享群集上运行。
import org.apache.spark.SparkContextimport scala.language.implicitConversionsclass RichSparkContext(val sc: SparkContext) { def executorCount: Int = sc.getExecutorStorageStatus.length - 1 // one is the driver def coresPerExecutor: Int = RichSparkContext.coresPerExecutor(sc) def coreCount: Int = executorCount * coresPerExecutor def coreCount(coresPerExecutor: Int): Int = executorCount * coresPerExecutor}object RichSparkContext { trait Enrichment { implicit def enrichmetadata(sc: SparkContext): RichSparkContext = new RichSparkContext(sc) } object implicits extends Enrichment private var _coresPerExecutor: Int = 0 def coresPerExecutor(sc: SparkContext): Int = synchronized { if (_coresPerExecutor == 0) sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head else _coresPerExecutor }}
更新资料
最近getExecutorStorageStatus已被删除。我们已切换为使用
SparkEnv的`blockManager.master.getStorageStatus.length
1
(减号再次用于驱动程序)。正常的方式来得到它,通过env
的SparkContext
是没有的外部访问org.apache.spark`包。因此,我们使用封装违规模式:
package org.apache.sparkobject EncapsulationViolator { def sparkEnv(sc: SparkContext): SparkEnv = sc.env}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)