山东大学软件工程应用与实践: Spark(七) 代码分析

山东大学软件工程应用与实践: Spark(七) 代码分析,第1张

山东大学软件工程应用与实践: Spark(七) 代码分析

2021SC@SDUSC


目录

SparkUI

1.listenerBus

2.构造JobProgressListener


SparkUI 1.listenerBus

listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听模型,通过监听事件触发对各种监控器监听状态信息的修改,达到Ui界面的数据刷新的效果。它由以下几部分组成:

  • 事件阻塞队列,类型为linkedBlockingQueue[SparkListenerEvent]
  • 监听器数组,类型为ArrayBuffer[SparkListener]
  • 事件匹配监听器的线程

listenerBus的代码如下:

private val EVENT_QUEUE_CAPACITY = 10000
    private val eventQueue = new linkedBlockingQueue[SparkListenerEvent] (EVENT_QUEUE_CAPACITY)
    private var queueFullErrorMessageLogged = false
    private var started = false
    // A counter that represents the number of events produced and consumed in the queue
    private val eventLock = new Semaphore(0)

    private val listenerThread = new Thread("SparkListenerBus") {
        setDaemon(true)
        override def run(): Unit = Utils.logUncaughtExeceptions {
            while(true) {
                eventLock.acquire()
                // Atomically remove and process this event
                LiveListenerBus.this.synchronized {
                    val event = eventQueue.poll
                    if (event == Spark.ListenerShutdown) {
                        // Get out of the while loop and shutdown the daemon thread return
                    }
                    Option(event).foreach(postToAll)
                }
            }
        }
    }
    

    def start() {
        if (started) {
            throw new IllegalStateException("Listener bus already started!")
        }
        listenerThread.start()
        started = true
        }
    def post(event: SparkListenerEvent) {
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
            eventLock.release()
        } else {
            logQueueFullErrorMessage()
        }
    }

    def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
    
    def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty}

    def stop() {
        if (!started) {
            throw new IllegalStateException("Listener bus already started!")
        }
        listenerThread.start()
        started = true
        }
    def post (event: SparkListenerEvent) {
        val eventAdded = eventQueue.offer(event)
        if (eventAdded) {
            eventLock.release()
        } else {
            logQueueFullErrorMessage()
        }
    }
    
    def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

    def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

    def stop() {
        if (!started) {
            throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
        }
        post(SparkListenerShutdown)
        listenerThread.join()
    }

LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,代码如下:

protected val sparkListeners = new ArrayBuffer[SparkListener]
    with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener; SparkListener) {
    sparkListeners += listener
}

def postToAll(event: SparkListenerEvent) {
    event match {
        case stageSubmitted: SparkListenerStageSubmitted => 
            foreachListener(_.onStageSubmitted(stageSubmitted))
        case stageCompleted: SparkListenerStageCompleted => 
            foreachListener(_.onStageCompleted(stageCompleted))
        case jobStart: SparkListenerJobStart =>
            foreachListener(_.onJobStart(jobStart)) 
        case jobEnd: SparkListenerJobEnd => 
            foreachListener(_.onJobEnd(jobEnd)) 
        case taskStart: SparkListenerTaskStart => 
            foreachListener(_.onTaskStart(taskStart))
        case taskGettingResult: SparkListenerTaskGettingResult => 
            foreachListener(_.onTaskGettingResult(taskGettingResult)) 
        case taskEnd: SparkListenerTaskEnd => 
            foreachListener(_.onTaskEnd (taskEnd)) 
        case environmentUpdate: SparkListenerEnvironmentUpdate => 
            foreachListener(_.onEnvironmentUpdate(environmentUpdate)) 
        case blockManagerAdded: SparkListenerBlockManagerAdded => 
            foreachListener(_.onBlockManagerAdded(blockManagerAdded)) 
        case blockManagerRemoved: SparkListenerBlockManagerRemoved => 
            foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) 
        case unpersistRDD: SparkListenerUnpersistROD =>
            foreachListener(_.onUnpersistRDD(unpersistRDD)) 
        case applicationStart: SparkListenerApplicationStart => 
            foreachListener(_.onApplicationStart(applicationStart))
        case applicationEnd: SparkListenerApplicationEnd => 
            foreachListener(_.onApplicationEnd(applica七ionEnd))
        case metricsUpdate: SparkListener:ExecutorMetricsUpdate => 
            foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))             
        case SparkListenerShutdown =>
    }
}

private def foreachListener(f: SparkListener => Unit): Unit = {
    sparkListeners.foreach { listener =>
        try{
            f(listener)
        } catch {
            case e: Exception =>
            logError(s"Listener ${Utils.getFormattedClassName(Listener)} threw an                     
                exception", e)
        }
    }
}

2.构造JobProgressListener

JobProgressListener是SparkContext中重要的组成部分,通过监听listenerBus中的事件更新任务进度。创建JobProgressListener的代码如下:

private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)

val statusTracker = new SparkStatusTracker(this)

JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIdata信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。JobProgressListener的数据结构代码如下:

class JobProgressListener(conf: SparkConf) extends SparkListener with Logging{
    
    import JobProgressListener._
        
    type JobId = Int
    type StageId = Int
    type StageAttemptId = Int
    type PoolName = String
    type ExecutorId = String
    
    //Jobs: 
    val activeJobs = new HashMap[JobId, JobUIData]
    val completedJobs = ListBuffer[JobUIData]()
    val skippedStages = ListBuffer[StageInfo]()
    val failedStages = ListBuffer[StageInfo]()
    val stageIdToData = new HashMap[ (StageId, StageAttemptId), StageUiData]
    val stageIdToInfo = new HashMap[StageId, StageInfo]
    val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
    val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]] ()
    val numCompletedStages = 0
    val numFailedStages = 0
    
    //Misc:
    val executorIdToBlockManagerId = HashMap[executorId, BlockManagerId] ()
    def blockManagerIds = executorIdToBlockManagerId.values.toSeq
    
    var schedulingMode: Option[SchedulingMode] = None
    
    val retainedStages = conf.getint("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
    
    val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)

JobProgressListener实现了onJobStart、 onJobEnd、 onStageCompleted、 onStageSubmitted、 onTaskStart、 onTask.End等方法.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638633.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存