    public void flush() throws IOException, ClassNotFoundException,

           InterruptedException {

      LOG.info("Starting flush of map output");


      try {

        while (spillInProgress) {






        final int kvbend = 4 * kvend;



        if ((kvbend + METASIZE) % kvbuffer.length !=

            equator - (equator % METASIZE)) {

          // spill finished



   private void resetSpill() {

      final int e = equator;

      bufstart = bufend = e;

      final int aligned = e - (e % METASIZE);

      // set start/end to point to first meta record

      // Cast one of the operands to long to avoid integer overflow

      kvstart = kvend = (int)

        (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;

      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +

        (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");




        if (kvindex != kvend) {


          kvend = (kvindex + NMETA) % kvmeta.capacity();

          bufend = bufmark;

          LOG.info("Spilling map output");

          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

                   "; bufvoid = " + bufvoid);

          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

                   "); kvend = " + kvend + "(" + (kvend * 4) +

                   "); length = " + (distanceTo(kvend, kvstart,

                         kvmeta.capacity()) + 1) + "/" + maxRec);


      } catch (InterruptedException e) {

        throw new IOException("Interrupted while waiting for the writer", e);

      } finally {




      assert !spillLock.isHeldByCurrentThread();

      // shut down spill thread and wait for it to exit. Since the preceding

      // ensures that it is finished with its work (and sortAndSpill did not

      // throw), we elect to use an interrupt instead of setting a flag.

      // Spilling simultaneously from this thread while the spill thread

      // finishes its work might be both a useful way to extend this and also

      // sufficient motivation for the latter approach.

      try {





      } catch (InterruptedException e) {

        throw new IOException("Spill failed", e);


      // release sort buffer before the merge

      kvbuffer = null;



      Path outputPath = mapOutputFile.getOutputFile();








    private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {

      // get the approximate size of the final output/index files

      long finalOutFileSize = 0;

      long finalIndexFileSize = 0;

      final Path[] filename = new Path[numSpills];

      final TaskAttemptID mapId = getTaskID();

      for(int i = 0; i < numSpills; i++) {


        filename[i] = mapOutputFile.getSpillFile(i);


        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();//获取文件大小


      if (numSpills == 1) {





        if (indexCacheList.size() == 0) {


          sameVolRename(mapOutputFile.getSpillIndexFile(0),            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));//spillIndexFile改名。

        } else {




mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);




      // read in paged indices

      for (int i = indexCacheList.size(); i < numSpills; ++i) {


        Path indexFileName = mapOutputFile.getSpillIndexFile(i);

        indexCacheList.add(new SpillRecord(indexFileName, job));



      //make correction in the length to include the sequence file header

      //lengths for each partition

      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;


      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;


      Path finalOutputFile =


      Path finalIndexFile =


      //The output stream for the final single output file

      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);


      if (numSpills == 0) {


        //create dummy files

        IndexRecord rec = new IndexRecord();


        SpillRecord sr = new SpillRecord(partitions);


        try {

          for (int i = 0; i < partitions; i++) {

            long segmentStart = finalOut.getPos();

            FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

            Writer<K, V> writer =

              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);



            rec.startOffset = segmentStart;

            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

            sr.putIndex(rec, i);


          sr.writeToFile(finalIndexFile, job);


        } finally {







        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

        IndexRecord rec = new IndexRecord();

        final SpillRecord spillRec = new SpillRecord(partitions);

        for (int parts = 0; parts < partitions; parts++) {


          //create the segments to be merged

          List<Segment<K,V>> segmentList =

            new ArrayList<Segment<K, V>>(numSpills);


          for(int i = 0; i < numSpills; i++) {


            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =

              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,

                               indexRecord.partLength, codec, true);

            segmentList.add(i, s);

            if (LOG.isDebugEnabled()) {

              LOG.debug("MapId=" + mapId + " Reducer=" + parts +

                  "Spill =" + i + "(" + indexRecord.startOffset + "," +

                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");



          int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);


          boolean sortSegments = segmentList.size() > mergeFactor;



          RawKeyValueIterator kvIter = Merger.merge(job, rfs,

                         keyClass, valClass, codec,

                         segmentList, mergeFactor,

                         new Path(mapId.toString()),

                         job.getOutputKeyComparator(), reporter, sortSegments,

                         null, spilledRecordsCounter, sortPhase.phase(),



          //write merged output to disk

          long segmentStart = finalOut.getPos();

          FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

          Writer<K, V> writer =

              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,


          if (combinerRunner == null || numSpills < minSpillsForCombine) { // minSpillsForCombine在MapOutputBuffer构造函数内被初始化,numSpills 为mapTask已经溢写到磁盘spill文件数量

            Merger.writeFile(kvIter, writer, reporter, job);


  public static <K extends Object, V extends Object>

  void writeFile(RawKeyValueIterator records, Writer<K, V> writer,

                 Progressable progressable, Configuration conf)

  throws IOException {

    long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS,


    long recordCtr = 0;

    while(records.next()) {

      writer.append(records.getKey(), records.getValue());


      if (((recordCtr++) % progressBar) == 0) {





          } else {




            combinerRunner.combine(kvIter, combineCollector);






          // record offsets

          rec.startOffset = segmentStart;


          rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

          rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

          spillRec.putIndex(rec, parts);


        spillRec.writeToFile(finalIndexFile, job);




        for(int i = 0; i < numSpills; i++) {






该方法会将所有临时文件合并成一个大文件保存到output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并的过程中,Map Task以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式:每轮合并io.sort.factor,默认是100,个文件,并将产生的文 件重新加入待合并列表中,对文件排序后,重复上述过程,直到只有一个文件。只生产一个文件可以避免同时打开大量的文件和同时读取大量的小文件产生的随机读 取带来的开销。最后会删除所有的spill文件。





    if (useNewApi) {

      runNewMapper(job, splitMetaInfo, umbilical, reporter);

    } else {

      runOldMapper(job, splitMetaInfo, umbilical, reporter);


    done(umbilical, reporter);



  public void done(TaskUmbilicalProtocol umbilical,

                   TaskReporter reporter

                   ) throws IOException, InterruptedException {

    LOG.info("Task:" + taskId + " is done."

             + " And is in the process of committing");



    boolean commitRequired = isCommitRequired();

    if (commitRequired) {

      int retries = MAX_RETRIES;


      // say the task tracker that task is commit pending

      while (true) {

        try {

          umbilical.commitPending(taskId, taskStatus);



        } catch (InterruptedException ie) {

          // ignore

        } catch (IOException ie) {

          LOG.warn("Failure sending commit pending: " +


          if (--retries == 0) {





      //wait for commit approval and commit

      commit(umbilical, reporter, committer);




    // Make sure we send at least one set of counter increments. It's

    // ok to call updateCounters() in this thread after comm thread stopped.



    //signal the tasktracker that we are done



  private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {

    int retries = MAX_RETRIES;

    while (true) {

      try {



        LOG.info("Task '" + taskId + "' done.");


      } catch (IOException ie) {

        LOG.warn("Failure signalling completion: " +


        if (--retries == 0) {

          throw ie;





