Spark分析(十)Spark Streaming运行流程详解(5)

Spark分析(十)Spark Streaming运行流程详解(5),第1张

Spark分析(十)Spark Streaming运行流程详解(5) 2021SC@SDUSC 前言

至上一篇博客分析完了Spark Streaming的数据接收初步流程,接下来分析Spark Streaming的数据清理

Spark Streaming数据清理

Spark Streaming应用是持续不断地运行着的。如果不对内存资源进行有效管理,内存就有可能很快就耗尽。Spark Streaming应用有自己的对象、数据、元数据的清理机制。
Spark Streaming应用中的对象、数据、元数据是 *** 作DStream时产生的。
先给出数据清理的总流程图:

图1

前面还有一部分叫做JobGenerator的job.run:

// JobScheduler.JobHandler.run

def run() {
	try {
		...
		var _eventLoop = eventLoop
	if (_eventLoop != null) {
		_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
		PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
			job.run()
		}
		_eventLoop = eventLoop
		if (_eventLoop != null) {
		_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
		}
	} else {
		// JobScheduler has been stopped
	}
} finally {
	ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
	ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
	}

}
	

run发送了JobCompleted消息。JobScheduler.processEvent定义了针对消息的处理:

// JobScheduler.processEvent

private def processEvent(event: JobSchedulerEvent) {
	try {
		event match{
			case JobStarted(job, startTime) => hndleJobStart(job, startTime)
			case JobCompleted(job, completedTime) => handleJobCopletion(job, completedTime)
			case ErrorReported(m, e) => handleError(m, e)
		}
	} catch {
		case e: Throwable =>
			reportError("Error in job scheduler", e)
	}

}

对JobCompleted事件的处理是调用了handleJobCompletion。

// JobScheduler.handleJobCompletion

private def handleJobCompletion(job: Job, completedTime: Long) {
	val jobSet = jobSets.get(job.time)
	jobSet.handleJobCompletion(job)
	job.setEndTime(completedTime)
	listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
	logInfo("Finished job "+ job.id + "from job set of tme "+ jobSet.time)
	if (jobSet.hasCompleted) {
		jobSets.remove(jobSet.time)
		jobGenerator.onBatchCompletion(jobSet.time)
		logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
			jobSet.totalDelay / 1000.0, jobSet.time.toString,
			jobSet.processingDelay / 1000.0
			))
			listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)
		}
		job.result match {
			case Failure(e) =>
				reportError("Error running job " + job, e)
			case _ =>

		}

}

清理JobSets中已提交执行的JobSet,还调用了jobGenerator.onBatchCompletion

// JobGenerator.onBatchCompletion



def onBatchCompletion(time: Time) {
	eventLoop.post(Clearmetadata(time))
}

发送了Clearmetadata消息。下面查看以下JobGenerator.start中eventLoop的定义:

// JobGenerator.start片段

eventLoop = new EventLoop[JobGeneratorEvent] )"JobGenerator") {
	override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)



	override protected def onError(e: Throwable): Unit = {
		jobScheduler.reportError("Error in job generator", e)
		
	}

}

由此可知,消息是由eventLoop.onReceive指定的JobGenerator.processEvent做处理:

// JobGenerator.processEvent



private def processEvent(event: JobGeneratorEvent) {
	logDebug("Got event " + event)
	event match {
		case GenerateJobs(time) => generateJobs(time)
		case Clearmetadata(time) => clearmetadata(time)
		case DoCheckpoint(time, clearCheckpointDataLater) =>
			doCheckpoint(time, clearCheckpointDataLater)
		case ClearCheckpointData(time) => clearCheckpointData(time)
	
	}

}

其中针对清理元数据(Clearmetadata)消息的处理是clearmetadata

// JobGenerator.clearmetadata



private def clearmetadata(time: Time) {
	ssc.graph.clearmetadata(time)



	// If checkpoint is enabled, then checkpoint,
	// else mark batch to be fully processed
	if (shouldCheckpoint) {
		eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
	} else {
		// If checkpointing is not enabled, the delete meatdata information about
		// received blocks (block data not saved in any case). Otherwise, wait for
		// checkpointing of this batch to complete.
		val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
		jobScheduler.receiverTracker.cleanupOldBlocksAndBatched(time - maxRemeberDuration)
		jobScheduler.inputInfoTracker.cleanup(time - maxRemeberDuration)
		markBatchFullyProcessed(time)
	}

}

可以看到有多项清理工作。而receiverTracker和inputInfoTracker的清理工作有前提条件:不需要设置检查点 。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5656597.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存