[Spark2.4] 增加Prometheus Sink带来的问题,并通过源码分析原因

[Spark2.4] 增加Prometheus Sink带来的问题,并通过源码分析原因,第1张

[Spark2.4] 增加Prometheus Sink带来的问题,并通过源码分析原因

目录

背景

Prometheus Sink

如何对外暴露Http Endpoint

服务注册源码

服务发现源码

如何采集spark metrics

MetricsReporter源码

配置spark conf启用prometheus sink

Prometheus Sink启动的问题

Spark Metrics System

spark启动过程源码分析

spark-submit.sh

SparkSubmit执行流程 

--jars、--driver-library-path和--driver-class-path

spark main jar

YarnClusterApplication

AM启动流程

AM CLASSPASH环境变量的设置

Container资源上传到HDFS

启动AM

ApplicationMaster 

解决ClassNotFound

executor也出现ClassNotFoundException


背景

        当前有一个multi-module project,其中有ModuleA,ModuleB,ModuleC。

        ModuleA:定义了main函数,会通过SPI和反射等机制,从ModuleC中加载Spark foreachRDD方法的具体执行逻辑。

        ModuleB:定义了sparkstreaming context的配置,以及DStream的处理顺序,Prometheus Sink相关的代码就写在这个工程中。

        ModuleC:主要定义了foreachRDD的具体执行逻辑,例如将RDD对象反序列化后,写入Hbase中。

        spark job使用spark-submit启动,使用yarn-cluster部署模型。

Prometheus Sink

        增加prometheus,主要有两个挑战。

如何对外暴露Http Endpoint

        由于spark提交的任务数量是不确定的,且使用yarn进行部署,因此要求动态分配port;且由于prometheus采用的pull模型,因此要求http endpoint需要支持服务发现的能力。

        prometheus-2.30支持Http Service Discovery功能,因此决定使用zookeeper+springboot的方式实现服务注册和服务发现功能。

服务注册源码
package ...;

import com.codahale.metrics.MetricRegistry;
import ....PrometheusReporter;
import io.prometheus.client.exporter.HTTPServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.RetryForever;
import org.apache.spark.metrics.sink.Sink;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.TimeUnit;


public class PrometheusSink implements Sink {
    private final static Logger LOG = LoggerFactory.getLogger(PrometheusSink.class);
    private final static String ZK_HOSTS_KEY = "zkHosts";
    private final static String ZK_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
    private final static String ZK_CONNECT_TIMEOUT_MS = "connectTimeoutMs";
    private final static String ZK_RETRY_INTERVAL_MS = "retryIntervalMs";
    private final static String SPARK_JOB_NAME = "jobName";
    private final Properties property;
    private final MetricRegistry metricRegistry;
    private HTTPServer httpServer;
    private Curatorframework zkClient;
    private PrometheusReporter prometheusReporter;
    public PrometheusSink(Properties property,
                          MetricRegistry registry,
                          org.apache.spark.SecurityManager securityMgr) {
        this.property = property;
        this.metricRegistry = registry;
    }

    @Override
    public void start() {
        try {
            String zkHosts = this.property.getProperty(ZK_HOSTS_KEY);
            if (StringUtils.isEmpty(zkHosts))
                throw new RuntimeException("[[instance].sink.prometheus.zkHosts]不能为空");
            int sessionTimeoutMs = Integer.parseInt(this.property
                    .getProperty(ZK_SESSION_TIMEOUT_MS, "30000"));
            int connectTimeoutMs = Integer.parseInt(this.property
                    .getProperty(ZK_CONNECT_TIMEOUT_MS, "10000"));
            int retryInterval = Integer.parseInt(this.property
                    .getProperty(ZK_RETRY_INTERVAL_MS, "3000"));
            String jobName = this.property.getProperty(SPARK_JOB_NAME);
            if (StringUtils.isEmpty(jobName))
                throw new RuntimeException("[[instance].sink.prometheus.jobName]不能为空");
            this.prometheusReporter = new PrometheusReporter(this.metricRegistry,jobName);
            int retryCount = 0;
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
            String url = "http://%s:%d/metrics";
            String ip = null;
            int port;
            while (true) {
                try {
                    try (ServerSocket serverSocket = new ServerSocket()) {
                        serverSocket.bind(inetSocketAddress);
                        ip = inetSocketAddress.getAddress().getHostAddress();
                        port = serverSocket.getLocalPort();
                        url = String.format(url, ip, port);
                    }
                    this.httpServer = new HTTPServer(port);
                    break;
                } catch (IOException ioException) {
                    if(++retryCount==3) {
                        throw ioException;
                    }
                }
            }
            LOG.info("start [Prometheus] metrics sinknHttp EndPoint:{}",String.format(url,ip, port));

            //注册ZK
            this.zkClient = CuratorframeworkFactory.newClient(zkHosts,sessionTimeoutMs,connectTimeoutMs,new RetryForever(retryInterval));
            this.zkClient.start();
            LOG.info("启动zk client,zkHosts=[{}],sessionTimeout=[{}] ms,connectTimeout=[{}] ms," +
                            "failOverRetryInterval=[{}] ms",zkHosts,sessionTimeoutMs,connectTimeoutMs,
                    retryInterval);
            this.zkClient.blockUntilConnected();
            String zNodePath = "/waterfall/spark/metrics/promethues/"+jobName+"/config";
            this.zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(zNodePath,url.getBytes(StandardCharsets.US_ASCII));
        }catch (Exception e) {
            throw new RuntimeException("启动prometheus metrics失败",e);
        }
        this.prometheusReporter.start(15, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        if(zkClient!=null) {
            zkClient.close();
        }
        if(this.httpServer!=null)
            this.httpServer.close();
        this.prometheusReporter.stop();
    }

    @Override
    public void report() {
        //源码来看,只有在JVM停止前才会触发report
        LOG.info("JVM进程即将停止,会自动调用一次report");
        this.prometheusReporter.report();
    }
}
服务发现源码
package ...;

import ....config.zookeeper.ZookeeperConfigProperties;
import ....config.mvc.exception.BusinessException;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class FlinkServiceDiscovery implements Closeable {
    private final static Logger LOG = LoggerFactory.getLogger(FlinkServiceDiscovery.class);

    private final ZookeeperConfigProperties zkConfig;
    private final Path parentPath;
    private Curatorframework zkClient;
    private TreeCache treeCache;

    //正在运行的flink task
    private final Map> activeServiceMap = new HashMap<>();
    private final ReadWriteLock serviceMapLock = new ReentrantReadWriteLock();
    //下面两个属性,用于实现第一次TreeNode Cache初始化成功后才启动spring上下文
    private final AtomicBoolean hasStart = new AtomicBoolean(false);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    public FlinkServiceDiscovery(ZookeeperConfigProperties zkConfig) {
        this.zkConfig = zkConfig;
        parentPath = Paths.get(this.zkConfig.getFlinkMetricsZnodePath());
    }

    
    public boolean start() throws InterruptedException {
        LOG.info("start flink metrics server的服务发现功能,连接的zookeeper:[{}],监控的ZNode Path:[{}]",
                this.zkConfig.getZkHosts(), this.zkConfig.getFlinkMetricsZnodePath());
        this.zkClient = CuratorframeworkFactory
                .newClient(this.zkConfig.getZkHosts(), new RetryForever(3000));
        this.zkClient.start();
        //等待连接建立成功
        this.zkClient.blockUntilConnected();
        try {
            Stat stat = this.zkClient.checkExists()
                    .forPath(this.zkConfig.getFlinkMetricsZnodePath());
            if (stat == null) {
                LOG.warn("不存在ZNode路径:[{}],因此不会启动服务发现功能",this.zkConfig.getFlinkMetricsZnodePath());
                return false;
            }
            LOG.info("开始对ZNode:[{}]建立监听", this.zkConfig.getFlinkMetricsZnodePath());
            this.treeCache = TreeCache.newBuilder(this.zkClient, this.zkConfig.getFlinkMetricsZnodePath())
                    .setCacheData(true)
                    .build();
            treeCache.getListenable().addListener(this::ZnodeListenerFunction);
            this.treeCache.start();
            return true;
        } catch (Exception exception) {
            LOG.warn("建立服务发现功能失败", exception);
            try {
                this.close();
            } catch (IOException ioException) {
                //do nothing
            }
        }
        return false;
    }

    
    public void waitUntilWatcherReady() throws InterruptedException {
        LOG.info("等待watcher Node Cache initial");
        if(!this.hasStart.get()) {
            this.countDownLatch.await();
        }
        LOG.info("watcher Node Cache初始化完成");
    }

    
    public Map> findAllActiveServices() {
        try {
            this.serviceMapLock.readLock().lock();
            return this.activeServiceMap;
        }finally {
            this.serviceMapLock.readLock().unlock();
        }
    }

    
    private void ZnodeListenerFunction(Curatorframework curatorframework, TreeCacheEvent event) {
        switch (event.getType()) {
            case NODE_ADDED:
                this.addServiceMap(event);
                break;
            case NODE_UPDATeD:
                LOG.warn("node update,{}", event.getData().toString());
                //理论上不会发生
                break;
            case NODE_REMOVED:
                this.deleteServiceMap(event);
                break;
            case CONNECTION_LOST:
            case CONNECTION_SUSPENDED:
                LOG.warn("zookeeper连接丢失,event type={}", event.getType());
                break;
            case INITIALIZED:
                LOG.debug("event type={}", event.getType());
                if(this.hasStart.compareAndSet(false,true)) {
                    this.countDownLatch.countDown();
                }
            default:
                LOG.debug("event type={}", event.getType());
        }
    }

    
    private void addServiceMap(TreeCacheEvent addEvent) {
        String watchPath = addEvent.getData().getPath();
        byte[] watchDataArray = addEvent.getData().getData();
        String watchData = "";
        if (watchDataArray != null && watchDataArray.length > 0) {
            watchData = new String(watchDataArray, StandardCharsets.UTF_8);
        }
        LOG.debug("node add,path={},data={}", watchPath, watchData);
        //由于zookeeper使用的资源定位方式类似于Path,因此直接通过java.nio.Path类来处理
        Path path = Paths.get(watchPath);
        Path subPath = this.parentPath.relativize(path);
        if (subPath.getNameCount() == 2) {
            String taskName = subPath.getName(0).toString();
            try {
                this.serviceMapLock.writeLock().lock();
                if (this.activeServiceMap.containsKey(taskName)) {
                    Set set = this.activeServiceMap.get(taskName);
                    set.add(watchData);
                } else {
                    Set set = new HashSet<>();
                    set.add(watchData);
                    this.activeServiceMap.put(taskName, set);
                }
            } finally {
                this.serviceMapLock.writeLock().unlock();
            }
        } else
            LOG.warn("ADD变化的ZNode Path={}", watchPath);
    }

    
    private void deleteServiceMap(TreeCacheEvent delEvent) {
        String watchPath = delEvent.getData().getPath();
        byte[] watchDataArray = delEvent.getData().getData();
        String watchData = "";
        if (watchDataArray != null && watchDataArray.length > 0) {
            watchData = new String(watchDataArray, StandardCharsets.UTF_8);
        }
        LOG.debug("node delete,path={},data={}", watchPath, watchData);
        Path path = Paths.get(watchPath);
        Path subPath = this.parentPath.relativize(path);
        if (subPath.getNameCount() == 1) {
            String taskName = subPath.getFileName().toString();
            LOG.warn("ZNode被手动删除,Path={}",watchPath);
            if(!StringUtils.hasText(taskName))
                return;
            try {
                this.serviceMapLock.writeLock().lock();
                this.activeServiceMap.remove(taskName);
            }finally {
                this.serviceMapLock.writeLock().unlock();
            }
        } else if (subPath.getNameCount() == 2) {
            String taskName = subPath.getName(0).toString();
            LOG.debug("flink task:[{}], metric server:[{}]的进程结束",taskName,watchData);
            try {
                this.serviceMapLock.writeLock().lock();
                if(this.activeServiceMap.containsKey(taskName)) {
                    this.activeServiceMap.get(taskName).remove(watchData);
                    if(this.activeServiceMap.get(taskName).isEmpty()) {
                        this.activeServiceMap.remove(taskName);
                    }
                }
            }finally {
                this.serviceMapLock.writeLock().unlock();
            }
        } else
            LOG.warn("DELETE变化的ZNode Path={}", watchPath);
    }

    @Override
    public void close() throws IOException {
        LOG.info("关闭flink metrics服务发现功能");
        if (this.treeCache != null)
            this.treeCache.close();
        if (this.zkClient != null)
            this.zkClient = null;
    }
}
如何采集spark metrics

        spark的metrics system使用的数据结构定义,与prometheus不兼容,因此需要实现一个Reporter类,专门负责数据结构的转换工作。

MetricsReporter源码
package ...;

import com.codahale.metrics.*;

import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class PrometheusReporter extends ScheduledReporter {
    private static final Pattern APPLICATION_ID_REGEX = Pattern.compile("(application_\d+_\d+)");

    private static io.prometheus.client.Gauge GAUGE_COLLECTOR = null;

    //NOTE:这里涉及到spark submit的源码部分,AM会阻塞(默认100S)等待driver端的sparkContext初始化结束
    private static AtomicInteger INDENTITY_COUNTER = new AtomicInteger(0);//1:driver,2:AM

    public PrometheusReporter(MetricRegistry registry, String jobName) {
        super(registry, "prometheus-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS);
        jobName = jobName.replaceAll("\.", "_");
        String gaugeNameSuffix = null;
        if(INDENTITY_COUNTER.incrementAndGet()==1) {
            gaugeNameSuffix = "_driver";
        }else {
            gaugeNameSuffix = "_am";
        }
        GAUGE_COLLECTOR = io.prometheus.client.Gauge.build("spark_guage_" + jobName+gaugeNameSuffix,
                "spark gauge metrics")
                .labelNames("application_id", "origin_lable").register();
    }

    @SuppressWarnings("all")
    @Override
    public void report(SortedMap gaugeMap,
                       SortedMap countMap,
                       SortedMap histogramMap,
                       SortedMap meterMap,
                       SortedMap timerMap) {
        
        if (gaugeMap != null) {
            for (Map.Entry gaugeEntry : gaugeMap.entrySet()) {
                String metricName = gaugeEntry.getKey();
                String[] splitKeyworlds = this.spilitMetricName(metricName);
                Gauge gauge = gaugeEntry.getValue();
                if(gauge.getValue() instanceof Number) {
                    GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0]).set(Double.parseDouble(String.valueOf(gauge.getValue())));
                }
            }
        }

        if(countMap!=null) {
            for (Map.Entry counterEntry : countMap.entrySet()) {
                String metricName = counterEntry.getKey();
                String[] splitKeyworlds = this.spilitMetricName(metricName);
                Counter counter = counterEntry.getValue();
                GAUGE_COLLECTOR.labels(splitKeyworlds[1],splitKeyworlds[0])
                        .set(counter.getCount());
            }
        }
    }

    private String[] spilitMetricName(String metricName) {
        String[] result = new String[2];
        result[0] = metricName;
        Matcher matcher = APPLICATION_ID_REGEX.matcher(metricName);
        if(matcher.find()) {
            result[0] = metricName.replaceAll("application_\d+_\d+","appid");
            result[1] = matcher.group(1);
        }else {
            result[1] = "NaN";
        }
        result[0] = result[0].replaceAll("\.","_");
        return result;
    }


}
配置spark conf启用prometheus sink

        配置方式有很多,例如通过${SPKAR_HOME}/conf/metrics.properties增加配置。我个人更喜欢通过spark-submit --conf的方式设置。

spark-submit --master yarn --deploy-mode cluster 
--conf spark.metrics.conf.*.sink.prometheus.class=...spark.metric.sink.PrometheusSink 
--conf spark.metrics.conf.*.sink.prometheus.zkHosts=xxx:2181 
--conf spark.metrics.conf.*.sink.prometheus.sessionTimeoutMs=30000 
--conf spark.metrics.conf.*.sink.prometheus.connectTimeoutMs=10000 
--conf spark.metrics.conf.*.sink.prometheus.retryIntervalMs=3000 
--conf spark.metrics.conf.*.sink.prometheus.jobName=MyJob1 
--jars moduleB.jar,moduleC.jar 
--class mainClass 
/tmp/moduleA.jar 

        到这里为止,已经按照官方的metric system文档完成了所有的开发工作,但在运行起来后,你会发现下面的问题:

Prometheus Sink启动的问题
22/01/27 10:20:16 ERROR metrics.MetricsSystem: Sink class ....spark.metric.sink.PrometheusSink cannot be instantiated
22/01/27 10:20:16 ERROR yarn.ApplicationMaster: Uncaught exception: 
java.lang.ClassNotFoundException: ....spark.metric.sink.PrometheusSink
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:242)
	at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks.apply(MetricsSystem.scala:198)
	at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks.apply(MetricsSystem.scala:194)
	at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:130)
	at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:130)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
	at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
	at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
	at org.apache.spark.deploy.yarn.ApplicationMaster.createAllocator(ApplicationMaster.scala:433)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:460)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:275)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:805)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:804)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:804)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

        在spark-submit中,通过--jars moduleB.jar设置了prometheus sink的代码(位于ModuleB中),理论上来说应该没有问题。

Spark Metrics System

spark metrics system的文档中有这么一段话:

syntax: [instance].sink|source.[name].[options]=[value]

#  This file configures Spark's internal metrics system. The metrics system is
#  divided into instances which correspond to internal components.
#  Each instance can be configured to report its metrics to one or more sinks.
#  Accepted values for [instance] are "master", "worker", "executor", "driver",
#  and "applications". A wildcard "*" can be used as an instance name, in
#  which case all instances will inherit the supplied property.

        也就是说,一个正常运行的spark on yarn任务,应该包含applications|driver|executor三个metric instance,而我们上面的配置是*,也就是这三个instance都增加了prometheus sink,下面我修改了配置:

--conf spark.metrics.conf.driver.sink.prometheus.class=...spark.metric.sink.PrometheusSink 
--conf spark.metrics.conf.driver.sink.prometheus.zkHosts=xxx:2181 
--conf spark.metrics.conf.driver.sink.prometheus.sessionTimeoutMs=30000 
--conf spark.metrics.conf.driver.sink.prometheus.connectTimeoutMs=10000 
--conf spark.metrics.conf.driver.sink.prometheus.retryIntervalMs=3000 
--conf spark.metrics.conf.driver.sink.prometheus.jobName=MyJob1 

        以这个配置重写spark-submit,是可以正常运行的(这里注意:如果上面的配置,driver改为executor,依然会报ClassNotFoundException,这个原因和ApplicationMaster是一样的)。

        到这里,可以说明spark driver和YarnAM之间加载的jar包是不一样的,需要通过源码分析一下。

spark启动过程源码分析 spark-submit.sh

SparkSubmit执行流程 

        这个类的主要作用是解析spark-submit传入的args,并反射调用org.apache.spark.deploy.yarn.YarnClusterApplication的main函数,源码很长,不过多解读,只了解几个很重要的地方。

--jars、--driver-library-path和--driver-class-path

        在经过SparkSubmit处理后,进入YarnClusterApplication方法之前,会构造出三个spark config出来:

    spark.yarn.dist.jars:等价于--jars中指定的jar包列表,但会做一些URI转换工作,例如相对路径转决定路径,globPath解析,FTP文件下载到本地等工作。spark.driver.extra.classPath:等价于--driver-class-path,不做任何修改。spark.driver.extra.libraryPath:等价于--driver-library-path,不做任何修改。
spark main jar

        spark main jar,在源码中被称为primary resource,这个信息在SparkSubmit中被构造为YarnClusterApplication的main函数args入参,形式为:

--jar moduleA.jar
--class moduleA.mainClassName
--arg moduleA.main.args

        到这里,SparkSubmit逻辑结束,进入YarnClusterApplication中。

YarnClusterApplication

        这个类用于启动yarn AM,并在AM中启动spark driver端(也就是ModuleA)的代码。

其核心在AM的jar包是如何设置的。

AM启动流程
def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // 重点!!!这里构造AM Container的上下文,所有jar包和Classpath相关都在这里面
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }
AM CLASSPASH环境变量的设置
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")
    val appId = newAppResponse.getApplicationId
    val appStagingDirPath = new Path(appStagingbaseDir, getAppStagingDir(appId))
    val pySparkArchives =
      if (sparkConf.get(IS_PYTHON_APP)) {
        findPySparkArchives()
      } else {
        Nil
      }

    //appStagingDirPath=hdfs://nameservice1/user/spark_cdh/.sparkStaging
    //pySparkArchives:不需要关注
    // 构造AM的Environment,其中包含CLASSPATH
    val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
    // 用于设置yarn container需要下载的资源,例如jar包,log4j.properties文件等
    val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)

    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
    amContainer.setLocalResources(localResources.asJava)
    amContainer.setEnvironment(launchEnv.asJava)

    //下面用于构造AM启动的脚本,就是一个java -server -jar ...  的命令
  }
private def setupLaunchEnv(
      stagingDirPath: Path,
      pySparkArchives: Seq[String]): HashMap[String, String] = {
    logInfo("Setting up the launch environment for our AM container")
    val env = new HashMap[String, String]()

    // 重点!!! AM的classpath支持扩展,通过这个配置项控制,而这个配置项又是通过--driver-class-path设置的
    // DRIVER_CLASS_PATH = spark.driver.extraClassPath
    
    populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
    env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
    env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()

    // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
    val amEnvPrefix = "spark.yarn.appMasterEnv."

    
    sparkConf.getAll
      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }

    // SPARK_DIST_CLASSPATH
    
    sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
      env(ENV_DIST_CLASSPATH) = dcp
    }

    env
  }
Container资源上传到HDFS

        这个方法也很长,只需要了解:

    spark primary resource(也就是moduleA.jar)上传到HDFS,并重命名为__app__.jar。--jars的文件,上传到HDFS。
启动AM

        核心是生成一个command,其内容大致为:

LD_LIBRARY_PATH="{spark.driver.extraLibraryPath}:$LD_LIBRARY_PATH"
{{JAVA_HOME}}/bin/java  
               -server  
               org.apache.spark.deploy.yarn.ApplicationMaster
               --class MyModuleAMainClass 
               --jar ModuleA.jar
               --arg 
               1>/stdout  
               2>/stderr
               

        【这里有个问题没有弄懂】spark.driver.extraLibraryPath(也就是通过--driver-library-path设置的jar包),不会影响AM的CLASSPATH,那么设置LD_LIBRARY_PATH的意义纠结何在?

        从上面的命令可以分析出,通过spark-submit --jars设置的jar包,并不会作为AM的CLASSPATH,那么spark driver能启动,也就说明AM与driver的关系应该如下图:

ApplicationMaster 
//1、AM的入口,主线程
def main(args: Array[String]): Unit = {
    SignalUtils.registerLogger(log)
    val amArgs = new ApplicationMasterArguments(args)
    master = new ApplicationMaster(amArgs)
    System.exit(master.run())
  }

//2、kerberos
final def run(): Int = {
    doAsUser {
      runImpl()
    }
    exitCode
  }

private def runImpl(): Unit = {
      if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }
  }


private def runDriver(): Unit = {
    addAmIpFilter(None)
    //重点!!!启动spark driver
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      //重点!!!等待sparkContext初始化结束(也就表示driver执行成功),但不会无限等待,默认100S
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)

        //重点!!!这里面就是负责调用AM端的prometheus sink
        createAllocator(driverRef, userConf)
      }
  }




private def startUserApplication(): Thread = {
    // userClassLoader负责加载了__app__.jar(也就是moduleA.jar)和spark-submit --jars指定的所有jar包
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
      //负责调用moduleA.jar的main方法,初始化sparkContext
    }
    userThread.setContextClassLoader(userClassLoader)
    userThread.setName("Driver")
    userThread.start()
    userThread
  }
private val userClassLoader = {
    val classpath = Client.getUserClasspath(sparkConf)
    val urls = classpath.map { entry =>
      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
    }

    if (isClusterMode) {
      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      } else {
        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      }
    } else {
      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
    }
  }
解决ClassNotFound

        到这里我们知道了,AM的默认CLASSPATH是不包含moduleA.jar、moduleB.jar和moduleC.jar的,因此找不到prometheus sink很正常,添加如下配置让AM增加moduleB.jar

spark-submit ... --driver-class-path moduleB.jar --class xxx /moduleA.jar ...

        在最开始的背景部分提到了,moduleA、moduleB和moduleC之间存在类的引用,因此上面配置后,AM的JVM classloader结构如下:

         由于上面的类分布,在ModuleA启动时,会去加载ModuleB的ClassB,而ClassB需要使用ModuleC的ClassC,此时从ClassB去load ClassC时,就会出现ClassC Not Found Exception。

        要解决上面的问题,只要保证通过--driver-class-path设置的jar不要引用--jars和main jar就可以了(但是main jar和--jars可以引用--driver-class-path的jar),我的解决方案是将prometheus sink的编码独立成一个jar包。

executor也出现ClassNotFoundException

        原因与AM出现的一样,spark-submit增加下面的配置即可解决。

--conf spark.executor.extraClassPath=spark-prometheus-metrics-1.0.jar

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716715.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存