目录
背景
Prometheus Sink
如何对外暴露Http Endpoint
服务注册源码
服务发现源码
如何采集spark metrics
MetricsReporter源码
配置spark conf启用prometheus sink
Prometheus Sink启动的问题
Spark Metrics System
spark启动过程源码分析
spark-submit.sh
SparkSubmit执行流程
--jars、--driver-library-path和--driver-class-path
spark main jar
YarnClusterApplication
AM启动流程
AM CLASSPASH环境变量的设置
Container资源上传到HDFS
启动AM
ApplicationMaster
解决ClassNotFound
executor也出现ClassNotFoundException
背景
当前有一个multi-module project,其中有ModuleA,ModuleB,ModuleC。
ModuleA:定义了main函数,会通过SPI和反射等机制,从ModuleC中加载Spark foreachRDD方法的具体执行逻辑。
ModuleB:定义了sparkstreaming context的配置,以及DStream的处理顺序,Prometheus Sink相关的代码就写在这个工程中。
ModuleC:主要定义了foreachRDD的具体执行逻辑,例如将RDD对象反序列化后,写入Hbase中。
spark job使用spark-submit启动,使用yarn-cluster部署模型。
Prometheus Sink增加prometheus,主要有两个挑战。
如何对外暴露Http Endpoint由于spark提交的任务数量是不确定的,且使用yarn进行部署,因此要求动态分配port;且由于prometheus采用的pull模型,因此要求http endpoint需要支持服务发现的能力。
prometheus-2.30支持Http Service Discovery功能,因此决定使用zookeeper+springboot的方式实现服务注册和服务发现功能。
服务注册源码package ...; import com.codahale.metrics.MetricRegistry; import ....PrometheusReporter; import io.prometheus.client.exporter.HTTPServer; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.retry.RetryForever; import org.apache.spark.metrics.sink.Sink; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.concurrent.TimeUnit; public class PrometheusSink implements Sink { private final static Logger LOG = LoggerFactory.getLogger(PrometheusSink.class); private final static String ZK_HOSTS_KEY = "zkHosts"; private final static String ZK_SESSION_TIMEOUT_MS = "sessionTimeoutMs"; private final static String ZK_CONNECT_TIMEOUT_MS = "connectTimeoutMs"; private final static String ZK_RETRY_INTERVAL_MS = "retryIntervalMs"; private final static String SPARK_JOB_NAME = "jobName"; private final Properties property; private final MetricRegistry metricRegistry; private HTTPServer httpServer; private Curatorframework zkClient; private PrometheusReporter prometheusReporter; public PrometheusSink(Properties property, MetricRegistry registry, org.apache.spark.SecurityManager securityMgr) { this.property = property; this.metricRegistry = registry; } @Override public void start() { try { String zkHosts = this.property.getProperty(ZK_HOSTS_KEY); if (StringUtils.isEmpty(zkHosts)) throw new RuntimeException("[[instance].sink.prometheus.zkHosts]不能为空"); int sessionTimeoutMs = Integer.parseInt(this.property .getProperty(ZK_SESSION_TIMEOUT_MS, "30000")); int connectTimeoutMs = Integer.parseInt(this.property .getProperty(ZK_CONNECT_TIMEOUT_MS, "10000")); int retryInterval = Integer.parseInt(this.property .getProperty(ZK_RETRY_INTERVAL_MS, "3000")); String jobName = this.property.getProperty(SPARK_JOB_NAME); if (StringUtils.isEmpty(jobName)) throw new RuntimeException("[[instance].sink.prometheus.jobName]不能为空"); this.prometheusReporter = new PrometheusReporter(this.metricRegistry,jobName); int retryCount = 0; InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0); String url = "http://%s:%d/metrics"; String ip = null; int port; while (true) { try { try (ServerSocket serverSocket = new ServerSocket()) { serverSocket.bind(inetSocketAddress); ip = inetSocketAddress.getAddress().getHostAddress(); port = serverSocket.getLocalPort(); url = String.format(url, ip, port); } this.httpServer = new HTTPServer(port); break; } catch (IOException ioException) { if(++retryCount==3) { throw ioException; } } } LOG.info("start [Prometheus] metrics sinknHttp EndPoint:{}",String.format(url,ip, port)); //注册ZK this.zkClient = CuratorframeworkFactory.newClient(zkHosts,sessionTimeoutMs,connectTimeoutMs,new RetryForever(retryInterval)); this.zkClient.start(); LOG.info("启动zk client,zkHosts=[{}],sessionTimeout=[{}] ms,connectTimeout=[{}] ms," + "failOverRetryInterval=[{}] ms",zkHosts,sessionTimeoutMs,connectTimeoutMs, retryInterval); this.zkClient.blockUntilConnected(); String zNodePath = "/waterfall/spark/metrics/promethues/"+jobName+"/config"; this.zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(zNodePath,url.getBytes(StandardCharsets.US_ASCII)); }catch (Exception e) { throw new RuntimeException("启动prometheus metrics失败",e); } this.prometheusReporter.start(15, TimeUnit.SECONDS); } @Override public void stop() { if(zkClient!=null) { zkClient.close(); } if(this.httpServer!=null) this.httpServer.close(); this.prometheusReporter.stop(); } @Override public void report() { //源码来看,只有在JVM停止前才会触发report LOG.info("JVM进程即将停止,会自动调用一次report"); this.prometheusReporter.report(); } }服务发现源码
package ...; import ....config.zookeeper.ZookeeperConfigProperties; import ....config.mvc.exception.BusinessException; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.retry.RetryForever; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class FlinkServiceDiscovery implements Closeable { private final static Logger LOG = LoggerFactory.getLogger(FlinkServiceDiscovery.class); private final ZookeeperConfigProperties zkConfig; private final Path parentPath; private Curatorframework zkClient; private TreeCache treeCache; //正在运行的flink task private final Map如何采集spark metrics> activeServiceMap = new HashMap<>(); private final ReadWriteLock serviceMapLock = new ReentrantReadWriteLock(); //下面两个属性,用于实现第一次TreeNode Cache初始化成功后才启动spring上下文 private final AtomicBoolean hasStart = new AtomicBoolean(false); private final CountDownLatch countDownLatch = new CountDownLatch(1); public FlinkServiceDiscovery(ZookeeperConfigProperties zkConfig) { this.zkConfig = zkConfig; parentPath = Paths.get(this.zkConfig.getFlinkMetricsZnodePath()); } public boolean start() throws InterruptedException { LOG.info("start flink metrics server的服务发现功能,连接的zookeeper:[{}],监控的ZNode Path:[{}]", this.zkConfig.getZkHosts(), this.zkConfig.getFlinkMetricsZnodePath()); this.zkClient = CuratorframeworkFactory .newClient(this.zkConfig.getZkHosts(), new RetryForever(3000)); this.zkClient.start(); //等待连接建立成功 this.zkClient.blockUntilConnected(); try { Stat stat = this.zkClient.checkExists() .forPath(this.zkConfig.getFlinkMetricsZnodePath()); if (stat == null) { LOG.warn("不存在ZNode路径:[{}],因此不会启动服务发现功能",this.zkConfig.getFlinkMetricsZnodePath()); return false; } LOG.info("开始对ZNode:[{}]建立监听", this.zkConfig.getFlinkMetricsZnodePath()); this.treeCache = TreeCache.newBuilder(this.zkClient, this.zkConfig.getFlinkMetricsZnodePath()) .setCacheData(true) .build(); treeCache.getListenable().addListener(this::ZnodeListenerFunction); this.treeCache.start(); return true; } catch (Exception exception) { LOG.warn("建立服务发现功能失败", exception); try { this.close(); } catch (IOException ioException) { //do nothing } } return false; } public void waitUntilWatcherReady() throws InterruptedException { LOG.info("等待watcher Node Cache initial"); if(!this.hasStart.get()) { this.countDownLatch.await(); } LOG.info("watcher Node Cache初始化完成"); } public Map > findAllActiveServices() { try { this.serviceMapLock.readLock().lock(); return this.activeServiceMap; }finally { this.serviceMapLock.readLock().unlock(); } } private void ZnodeListenerFunction(Curatorframework curatorframework, TreeCacheEvent event) { switch (event.getType()) { case NODE_ADDED: this.addServiceMap(event); break; case NODE_UPDATeD: LOG.warn("node update,{}", event.getData().toString()); //理论上不会发生 break; case NODE_REMOVED: this.deleteServiceMap(event); break; case CONNECTION_LOST: case CONNECTION_SUSPENDED: LOG.warn("zookeeper连接丢失,event type={}", event.getType()); break; case INITIALIZED: LOG.debug("event type={}", event.getType()); if(this.hasStart.compareAndSet(false,true)) { this.countDownLatch.countDown(); } default: LOG.debug("event type={}", event.getType()); } } private void addServiceMap(TreeCacheEvent addEvent) { String watchPath = addEvent.getData().getPath(); byte[] watchDataArray = addEvent.getData().getData(); String watchData = ""; if (watchDataArray != null && watchDataArray.length > 0) { watchData = new String(watchDataArray, StandardCharsets.UTF_8); } LOG.debug("node add,path={},data={}", watchPath, watchData); //由于zookeeper使用的资源定位方式类似于Path,因此直接通过java.nio.Path类来处理 Path path = Paths.get(watchPath); Path subPath = this.parentPath.relativize(path); if (subPath.getNameCount() == 2) { String taskName = subPath.getName(0).toString(); try { this.serviceMapLock.writeLock().lock(); if (this.activeServiceMap.containsKey(taskName)) { Set set = this.activeServiceMap.get(taskName); set.add(watchData); } else { Set set = new HashSet<>(); set.add(watchData); this.activeServiceMap.put(taskName, set); } } finally { this.serviceMapLock.writeLock().unlock(); } } else LOG.warn("ADD变化的ZNode Path={}", watchPath); } private void deleteServiceMap(TreeCacheEvent delEvent) { String watchPath = delEvent.getData().getPath(); byte[] watchDataArray = delEvent.getData().getData(); String watchData = ""; if (watchDataArray != null && watchDataArray.length > 0) { watchData = new String(watchDataArray, StandardCharsets.UTF_8); } LOG.debug("node delete,path={},data={}", watchPath, watchData); Path path = Paths.get(watchPath); Path subPath = this.parentPath.relativize(path); if (subPath.getNameCount() == 1) { String taskName = subPath.getFileName().toString(); LOG.warn("ZNode被手动删除,Path={}",watchPath); if(!StringUtils.hasText(taskName)) return; try { this.serviceMapLock.writeLock().lock(); this.activeServiceMap.remove(taskName); }finally { this.serviceMapLock.writeLock().unlock(); } } else if (subPath.getNameCount() == 2) { String taskName = subPath.getName(0).toString(); LOG.debug("flink task:[{}], metric server:[{}]的进程结束",taskName,watchData); try { this.serviceMapLock.writeLock().lock(); if(this.activeServiceMap.containsKey(taskName)) { this.activeServiceMap.get(taskName).remove(watchData); if(this.activeServiceMap.get(taskName).isEmpty()) { this.activeServiceMap.remove(taskName); } } }finally { this.serviceMapLock.writeLock().unlock(); } } else LOG.warn("DELETE变化的ZNode Path={}", watchPath); } @Override public void close() throws IOException { LOG.info("关闭flink metrics服务发现功能"); if (this.treeCache != null) this.treeCache.close(); if (this.zkClient != null) this.zkClient = null; } }
spark的metrics system使用的数据结构定义,与prometheus不兼容,因此需要实现一个Reporter类,专门负责数据结构的转换工作。
MetricsReporter源码package ...; import com.codahale.metrics.*; import java.util.Map; import java.util.SortedMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; public class PrometheusReporter extends ScheduledReporter { private static final Pattern APPLICATION_ID_REGEX = Pattern.compile("(application_\d+_\d+)"); private static io.prometheus.client.Gauge GAUGE_COLLECTOR = null; //NOTE:这里涉及到spark submit的源码部分,AM会阻塞(默认100S)等待driver端的sparkContext初始化结束 private static AtomicInteger INDENTITY_COUNTER = new AtomicInteger(0);//1:driver,2:AM public PrometheusReporter(MetricRegistry registry, String jobName) { super(registry, "prometheus-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS); jobName = jobName.replaceAll("\.", "_"); String gaugeNameSuffix = null; if(INDENTITY_COUNTER.incrementAndGet()==1) { gaugeNameSuffix = "_driver"; }else { gaugeNameSuffix = "_am"; } GAUGE_COLLECTOR = io.prometheus.client.Gauge.build("spark_guage_" + jobName+gaugeNameSuffix, "spark gauge metrics") .labelNames("application_id", "origin_lable").register(); } @SuppressWarnings("all") @Override public void report(SortedMap配置spark conf启用prometheus sinkgaugeMap, SortedMap countMap, SortedMap histogramMap, SortedMap meterMap, SortedMap timerMap) { if (gaugeMap != null) { for (Map.Entry gaugeEntry : gaugeMap.entrySet()) { String metricName = gaugeEntry.getKey(); String[] splitKeyworlds = this.spilitMetricName(metricName); Gauge gauge = gaugeEntry.getValue(); if(gauge.getValue() instanceof Number) { GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0]).set(Double.parseDouble(String.valueOf(gauge.getValue()))); } } } if(countMap!=null) { for (Map.Entry counterEntry : countMap.entrySet()) { String metricName = counterEntry.getKey(); String[] splitKeyworlds = this.spilitMetricName(metricName); Counter counter = counterEntry.getValue(); GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0]) .set(counter.getCount()); } } } private String[] spilitMetricName(String metricName) { String[] result = new String[2]; result[0] = metricName; Matcher matcher = APPLICATION_ID_REGEX.matcher(metricName); if(matcher.find()) { result[0] = metricName.replaceAll("application_\d+_\d+","appid"); result[1] = matcher.group(1); }else { result[1] = "NaN"; } result[0] = result[0].replaceAll("\.","_"); return result; } }
配置方式有很多,例如通过${SPKAR_HOME}/conf/metrics.properties增加配置。我个人更喜欢通过spark-submit --conf的方式设置。
spark-submit --master yarn --deploy-mode cluster --conf spark.metrics.conf.*.sink.prometheus.class=...spark.metric.sink.PrometheusSink --conf spark.metrics.conf.*.sink.prometheus.zkHosts=xxx:2181 --conf spark.metrics.conf.*.sink.prometheus.sessionTimeoutMs=30000 --conf spark.metrics.conf.*.sink.prometheus.connectTimeoutMs=10000 --conf spark.metrics.conf.*.sink.prometheus.retryIntervalMs=3000 --conf spark.metrics.conf.*.sink.prometheus.jobName=MyJob1 --jars moduleB.jar,moduleC.jar --class mainClass /tmp/moduleA.jar
到这里为止,已经按照官方的metric system文档完成了所有的开发工作,但在运行起来后,你会发现下面的问题:
Prometheus Sink启动的问题22/01/27 10:20:16 ERROR metrics.MetricsSystem: Sink class ....spark.metric.sink.PrometheusSink cannot be instantiated 22/01/27 10:20:16 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.ClassNotFoundException: ....spark.metric.sink.PrometheusSink at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:242) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks.apply(MetricsSystem.scala:198) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks.apply(MetricsSystem.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) at org.apache.spark.deploy.yarn.ApplicationMaster.createAllocator(ApplicationMaster.scala:433) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:460) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:275) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:805) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:804) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:804) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
在spark-submit中,通过--jars moduleB.jar设置了prometheus sink的代码(位于ModuleB中),理论上来说应该没有问题。
Spark Metrics Systemspark metrics system的文档中有这么一段话:
syntax: [instance].sink|source.[name].[options]=[value] # This file configures Spark's internal metrics system. The metrics system is # divided into instances which correspond to internal components. # Each instance can be configured to report its metrics to one or more sinks. # Accepted values for [instance] are "master", "worker", "executor", "driver", # and "applications". A wildcard "*" can be used as an instance name, in # which case all instances will inherit the supplied property.
也就是说,一个正常运行的spark on yarn任务,应该包含applications|driver|executor三个metric instance,而我们上面的配置是*,也就是这三个instance都增加了prometheus sink,下面我修改了配置:
--conf spark.metrics.conf.driver.sink.prometheus.class=...spark.metric.sink.PrometheusSink --conf spark.metrics.conf.driver.sink.prometheus.zkHosts=xxx:2181 --conf spark.metrics.conf.driver.sink.prometheus.sessionTimeoutMs=30000 --conf spark.metrics.conf.driver.sink.prometheus.connectTimeoutMs=10000 --conf spark.metrics.conf.driver.sink.prometheus.retryIntervalMs=3000 --conf spark.metrics.conf.driver.sink.prometheus.jobName=MyJob1
以这个配置重写spark-submit,是可以正常运行的(这里注意:如果上面的配置,driver改为executor,依然会报ClassNotFoundException,这个原因和ApplicationMaster是一样的)。
到这里,可以说明spark driver和YarnAM之间加载的jar包是不一样的,需要通过源码分析一下。
spark启动过程源码分析 spark-submit.sh SparkSubmit执行流程这个类的主要作用是解析spark-submit传入的args,并反射调用org.apache.spark.deploy.yarn.YarnClusterApplication的main函数,源码很长,不过多解读,只了解几个很重要的地方。
--jars、--driver-library-path和--driver-class-path在经过SparkSubmit处理后,进入YarnClusterApplication方法之前,会构造出三个spark config出来:
- spark.yarn.dist.jars:等价于--jars中指定的jar包列表,但会做一些URI转换工作,例如相对路径转决定路径,globPath解析,FTP文件下载到本地等工作。spark.driver.extra.classPath:等价于--driver-class-path,不做任何修改。spark.driver.extra.libraryPath:等价于--driver-library-path,不做任何修改。
spark main jar,在源码中被称为primary resource,这个信息在SparkSubmit中被构造为YarnClusterApplication的main函数args入参,形式为:
--jar moduleA.jar --class moduleA.mainClassName --arg moduleA.main.args
到这里,SparkSubmit逻辑结束,进入YarnClusterApplication中。
YarnClusterApplication这个类用于启动yarn AM,并在AM中启动spark driver端(也就是ModuleA)的代码。
其核心在AM的jar包是如何设置的。
AM启动流程def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // 重点!!!这里构造AM Container的上下文,所有jar包和Classpath相关都在这里面 val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { case e: Throwable => if (appId != null) { cleanupStagingDir(appId) } throw e } }AM CLASSPASH环境变量的设置
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDirPath = new Path(appStagingbaseDir, getAppStagingDir(appId)) val pySparkArchives = if (sparkConf.get(IS_PYTHON_APP)) { findPySparkArchives() } else { Nil } //appStagingDirPath=hdfs://nameservice1/user/spark_cdh/.sparkStaging //pySparkArchives:不需要关注 // 构造AM的Environment,其中包含CLASSPATH val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) // 用于设置yarn container需要下载的资源,例如jar包,log4j.properties文件等 val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources.asJava) amContainer.setEnvironment(launchEnv.asJava) //下面用于构造AM启动的脚本,就是一个java -server -jar ...的命令 }
private def setupLaunchEnv( stagingDirPath: Path, pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() // 重点!!! AM的classpath支持扩展,通过这个配置项控制,而这个配置项又是通过--driver-class-path设置的 // DRIVER_CLASS_PATH = spark.driver.extraClassPath populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* val amEnvPrefix = "spark.yarn.appMasterEnv." sparkConf.getAll .filter { case (k, v) => k.startsWith(amEnvPrefix) } .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } // SPARK_DIST_CLASSPATH sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp => env(ENV_DIST_CLASSPATH) = dcp } env }Container资源上传到HDFS
这个方法也很长,只需要了解:
- spark primary resource(也就是moduleA.jar)上传到HDFS,并重命名为__app__.jar。--jars的文件,上传到HDFS。
核心是生成一个command,其内容大致为:
LD_LIBRARY_PATH="{spark.driver.extraLibraryPath}:$LD_LIBRARY_PATH" {{JAVA_HOME}}/bin/java -server org.apache.spark.deploy.yarn.ApplicationMaster --class MyModuleAMainClass --jar ModuleA.jar --arg1> /stdout 2> /stderr
【这里有个问题没有弄懂】spark.driver.extraLibraryPath(也就是通过--driver-library-path设置的jar包),不会影响AM的CLASSPATH,那么设置LD_LIBRARY_PATH的意义纠结何在?
从上面的命令可以分析出,通过spark-submit --jars设置的jar包,并不会作为AM的CLASSPATH,那么spark driver能启动,也就说明AM与driver的关系应该如下图:
ApplicationMaster//1、AM的入口,主线程 def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) master = new ApplicationMaster(amArgs) System.exit(master.run()) } //2、kerberos final def run(): Int = { doAsUser { runImpl() } exitCode } private def runImpl(): Unit = { if (isClusterMode) { runDriver() } else { runExecutorLauncher() } } private def runDriver(): Unit = { addAmIpFilter(None) //重点!!!启动spark driver userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { //重点!!!等待sparkContext初始化结束(也就表示driver执行成功),但不会无限等待,默认100S val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) //重点!!!这里面就是负责调用AM端的prometheus sink createAllocator(driverRef, userConf) } } private def startUserApplication(): Thread = { // userClassLoader负责加载了__app__.jar(也就是moduleA.jar)和spark-submit --jars指定的所有jar包 val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) val userThread = new Thread { //负责调用moduleA.jar的main方法,初始化sparkContext } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread }
private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => new URL("file:" + new File(entry.getPath()).getAbsolutePath()) } if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } else { new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } } else { new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } }解决ClassNotFound
到这里我们知道了,AM的默认CLASSPATH是不包含moduleA.jar、moduleB.jar和moduleC.jar的,因此找不到prometheus sink很正常,添加如下配置让AM增加moduleB.jar
spark-submit ... --driver-class-path moduleB.jar --class xxx /moduleA.jar ...
在最开始的背景部分提到了,moduleA、moduleB和moduleC之间存在类的引用,因此上面配置后,AM的JVM classloader结构如下:
由于上面的类分布,在ModuleA启动时,会去加载ModuleB的ClassB,而ClassB需要使用ModuleC的ClassC,此时从ClassB去load ClassC时,就会出现ClassC Not Found Exception。
要解决上面的问题,只要保证通过--driver-class-path设置的jar不要引用--jars和main jar就可以了(但是main jar和--jars可以引用--driver-class-path的jar),我的解决方案是将prometheus sink的编码独立成一个jar包。
executor也出现ClassNotFoundException原因与AM出现的一样,spark-submit增加下面的配置即可解决。
--conf spark.executor.extraClassPath=spark-prometheus-metrics-1.0.jar
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)