至上一篇博客分析完了Spark Streaming的数据接收初步流程,接下来分析Spark Streaming的数据清理
Spark Streaming数据清理Spark Streaming应用是持续不断地运行着的。如果不对内存资源进行有效管理,内存就有可能很快就耗尽。Spark Streaming应用有自己的对象、数据、元数据的清理机制。
Spark Streaming应用中的对象、数据、元数据是 *** 作DStream时产生的。
先给出数据清理的总流程图:
前面还有一部分叫做JobGenerator的job.run:
// JobScheduler.JobHandler.run def run() { try { ... var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } }
run发送了JobCompleted消息。JobScheduler.processEvent定义了针对消息的处理:
// JobScheduler.processEvent private def processEvent(event: JobSchedulerEvent) { try { event match{ case JobStarted(job, startTime) => hndleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCopletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }
对JobCompleted事件的处理是调用了handleJobCompletion。
// JobScheduler.handleJobCompletion private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job "+ job.id + "from job set of tme "+ jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => } }
清理JobSets中已提交执行的JobSet,还调用了jobGenerator.onBatchCompletion
// JobGenerator.onBatchCompletion def onBatchCompletion(time: Time) { eventLoop.post(Clearmetadata(time)) }
发送了Clearmetadata消息。下面查看以下JobGenerator.start中eventLoop的定义:
// JobGenerator.start片段 eventLoop = new EventLoop[JobGeneratorEvent] )"JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } }
由此可知,消息是由eventLoop.onReceive指定的JobGenerator.processEvent做处理:
// JobGenerator.processEvent private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case Clearmetadata(time) => clearmetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
其中针对清理元数据(Clearmetadata)消息的处理是clearmetadata
// JobGenerator.clearmetadata private def clearmetadata(time: Time) { ssc.graph.clearmetadata(time) // If checkpoint is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { // If checkpointing is not enabled, the delete meatdata information about // received blocks (block data not saved in any case). Otherwise, wait for // checkpointing of this batch to complete. val maxRememberDuration = graph.getMaxInputStreamRememberDuration() jobScheduler.receiverTracker.cleanupOldBlocksAndBatched(time - maxRemeberDuration) jobScheduler.inputInfoTracker.cleanup(time - maxRemeberDuration) markBatchFullyProcessed(time) } }
可以看到有多项清理工作。而receiverTracker和inputInfoTracker的清理工作有前提条件:不需要设置检查点 。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)