override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
//构建切割对象,(读取hbase数据时,该对象是:TableSplit)
private val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
private val conf = getConf
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets InputFileBlockHolder for the file block's information
split.serializableHadoopSplit.value match {
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
case _ =>
InputFileBlockHolder.unset()
}
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
//构建reader对象:(此处formart对象:HadoopInputFormart)
private val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
//构建恩物临时id
private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var finished = false
private var reader =
try {
val _reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
_reader
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
finished = true
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
e)
finished = true
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit] { context =>
// Update the bytesRead before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
close()
}
private var havePair = false
private var recordsSinceMetricsUpdate = 0
override def hasNext: Boolean = {
if (!finished && !havePair) {
try {
finished = !reader.nextKeyValue
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
e)
finished = true
}
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
close()
}
havePair = !finished
}
!finished
}
override def next(): (K, V) = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
if (!finished) {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
}
(reader.getCurrentKey, reader.getCurrentValue)
}
评论列表(0条)