- Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
- Map阶段:该阶段主要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value
- collect阶段:在用户编写的map()函数中,当数据处理完成后,会调用OutputCollectior.collect()输出结果,该函数会将生成的key/value分片(调用Pattitioner),并写入一个环形缓冲区中
- Spill阶段:”溢写“,当环形缓冲区满后, 将数据落盘,生成一个临时文件,落盘之前会对数据进行一次本地排序,在必要时对数据进行合并、压缩等 *** 作
- Combine阶段:将临时文件进行合并,确保最终生成一个数据文件
Map Task最重要的部分是输出结果在内存和磁盘中的组织方式,具体设计Collect、Spill、Combine三个阶段。
进入Map Task的run方法:
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. //判断是否有reduce task,如果没有,就不需要sort if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } //report参数 TaskReporter reporter = startReporter(umbilical); //是否使用新api boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } if (useNewApi) { runNewMapper(job, splitmetaInfo, umbilical, reporter); } else { //分析这个,旧版的API runOldMapper(job, splitmetaInfo, umbilical, reporter); } done(umbilical, reporter); }
用户在map() 函数中调用OldOutputCollector.collect(key,value)函数,函数首先调用partitioner.getPartition获取记录的分区号partition,然后将(key, value, numPartitions)传递给MapOutputBuffer.collect()函数做进一步处理。
@Override public void collect(K key, V value) throws IOException { try { collector.collect(key, value, partitioner.getPartition(key, value, numPartitions)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("interrupt exception", ie); } }
public synchronized void collect(K key, V value, final int partition ) throws IOException { reporter.progress(); if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " + keyClass.getName() + ", received " + key.getClass().getName()); } if (value.getClass() != valClass) { throw new IOException("Type mismatch in value from map: expected " + valClass.getName() + ", received " + value.getClass().getName()); } if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } checkSpillException(); bufferRemaining -= metaSIZE; if (bufferRemaining <= 0) { // start spill if the thread is not running and the soft limit has been // reached spillLock.lock(); try { do { if (!spillInProgress) { final int kvbidx = 4 * kvindex; final int kvbend = 4 * kvend; // serialized, unspilled bytes always lie between kvindex and // bufindex, crossing the equator. Note that any void space // created by a reset must be included in "used" bytes final int bUsed = distanceTo(kvbidx, bufindex); final boolean bufsoftlimit = bUsed >= softLimit; if ((kvbend + metaSIZE) % kvbuffer.length != equator - (equator % metaSIZE)) { // spill finished, reclaim space resetSpill(); bufferRemaining = Math.min( distanceTo(bufindex, kvbidx) - 2 * metaSIZE, softLimit - bUsed) - metaSIZE; continue; } else if (bufsoftlimit && kvindex != kvend) { // spill records, if any collected; check latter, as it may // be possible for metadata alignment to hit spill pcnt startSpill(); final int avgRec = (int) (mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter()); // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex final int distkvi = distanceTo(bufindex, kvbidx); final int newPos = (bufindex + Math.max(2 * metaSIZE - 1, Math.min(distkvi / 2, distkvi / (metaSIZE + avgRec) * metaSIZE))) % kvbuffer.length; setEquator(newPos); bufmark = bufindex = newPos; final int serBound = 4 * kvend; // bytes remaining before the lock must be held and limits // checked is the minimum of three arcs: the metadata space, the // serialization space, and the soft limit bufferRemaining = Math.min( // metadata max distanceTo(bufend, newPos), Math.min( // serialization max distanceTo(newPos, serBound), // soft limit softLimit)) - 2 * metaSIZE; } } } while (false); } finally { spillLock.unlock(); } } try { // serialize key bytes into buffer int keystart = bufindex; keySerializer.serialize(key); if (bufindex < keystart) { // wrapped the key; must make contiguous bb.shiftBufferedKey(); keystart = 0; } // serialize value bytes into buffer final int valstart = bufindex; valSerializer.serialize(value); // It's possible for records to have zero length, i.e. the serializer // will perform no writes. To ensure that the boundary conditions are // checked and that the kvindex invariant is maintained, perform a // zero-length write into the buffer. The logic monitoring this could be // moved into collect, but this is cleaner and inexpensive. For now, it // is acceptable. bb.write(b0, 0, 0); // the record must be marked after the preceding write, as the metadata // for this record are not yet written int valend = bb.markRecord(); mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment( distanceTo(keystart, valend, bufvoid)); // write accounting info kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // advance kvindex kvindex = (kvindex - Nmeta + kvmeta.capacity()) % kvmeta.capacity(); } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; } }
- 快排,对缓冲区kvbuffer中区间[bufstart,bufend]中的数据进行排序,排序方式:先按照分区编号partition进行排序,然后根据key进行排序,这样排序完就是以分区为单位聚集在一起,且同一分区内的数据按照key有序。
- 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示溢写次数)中,如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集 *** 作。
- 将分区数据的元信息写到内存索引数据结构SpillRecord中,每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。