2021SC@SDUSC
目录
SparkUI
1.listenerBus
2.构造JobProgressListener
SparkUI 1.listenerBus
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监控器监听状态信息的修改,达到Ui界面的数据刷新的效果。它由以下几部分组成:
- 事件阻塞队列,类型为linkedBlockingQueue[SparkListenerEvent]
- 监听器数组,类型为ArrayBuffer[SparkListener]
- 事件匹配监听器的线程
listenerBus的代码如下:
private val EVENT_QUEUE_CAPACITY = 10000 private val eventQueue = new linkedBlockingQueue[SparkListenerEvent] (EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false // A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0) private val listenerThread = new Thread("SparkListenerBus") { setDaemon(true) override def run(): Unit = Utils.logUncaughtExeceptions { while(true) { eventLock.acquire() // Atomically remove and process this event LiveListenerBus.this.synchronized { val event = eventQueue.poll if (event == Spark.ListenerShutdown) { // Get out of the while loop and shutdown the daemon thread return } Option(event).foreach(postToAll) } } } } def start() { if (started) { throw new IllegalStateException("Listener bus already started!") } listenerThread.start() started = true } def post(event: SparkListenerEvent) { val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { logQueueFullErrorMessage() } } def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty} def stop() { if (!started) { throw new IllegalStateException("Listener bus already started!") } listenerThread.start() started = true } def post (event: SparkListenerEvent) { val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { logQueueFullErrorMessage() } } def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } def stop() { if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } post(SparkListenerShutdown) listenerThread.join() }
LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,代码如下:
protected val sparkListeners = new ArrayBuffer[SparkListener] with mutable.SynchronizedBuffer[SparkListener] def addListener(listener; SparkListener) { sparkListeners += listener } def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => foreachListener(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted => foreachListener(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart => foreachListener(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd => foreachListener(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => foreachListener(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult => foreachListener(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => foreachListener(_.onTaskEnd (taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate => foreachListener(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => foreachListener(_.onBlockManagerAdded(blockManagerAdded)) case blockManagerRemoved: SparkListenerBlockManagerRemoved => foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistROD => foreachListener(_.onUnpersistRDD(unpersistRDD)) case applicationStart: SparkListenerApplicationStart => foreachListener(_.onApplicationStart(applicationStart)) case applicationEnd: SparkListenerApplicationEnd => foreachListener(_.onApplicationEnd(applica七ionEnd)) case metricsUpdate: SparkListener:ExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) case SparkListenerShutdown => } } private def foreachListener(f: SparkListener => Unit): Unit = { sparkListeners.foreach { listener => try{ f(listener) } catch { case e: Exception => logError(s"Listener ${Utils.getFormattedClassName(Listener)} threw an exception", e) } } }2.构造JobProgressListener
JobProgressListener是SparkContext中重要的组成部分,通过监听listenerBus中的事件更新任务进度。创建JobProgressListener的代码如下:
private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) val statusTracker = new SparkStatusTracker(this)
JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIdata信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。JobProgressListener的数据结构代码如下:
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging{ import JobProgressListener._ type JobId = Int type StageId = Int type StageAttemptId = Int type PoolName = String type ExecutorId = String //Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() val skippedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[ (StageId, StageAttemptId), StageUiData] val stageIdToInfo = new HashMap[StageId, StageInfo] val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]] () val numCompletedStages = 0 val numFailedStages = 0 //Misc: val executorIdToBlockManagerId = HashMap[executorId, BlockManagerId] () def blockManagerIds = executorIdToBlockManagerId.values.toSeq var schedulingMode: Option[SchedulingMode] = None val retainedStages = conf.getint("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
JobProgressListener实现了onJobStart、 onJobEnd、 onStageCompleted、 onStageSubmitted、 onTaskStart、 onTask.End等方法.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)