1. 程式人生 > >大資料:Map終結和Spill檔案合併

大資料:Map終結和Spill檔案合併

當Mapper沒有資料輸入,mapper.run中的while迴圈會呼叫context.nextKeyValue就返回false,於是便返回到runNewMapper中,在這裡程式會關閉輸入通道和輸出通道,這裡關閉輸出通道並沒有關閉collector,必須要先flush一下。

 獲取更多大資料視訊資料請加QQ群:947967114       程式碼結構:

Maptask.runNewMapper->NewOutputCollector.close->MapOutputBuffer.flush

我們看flush幫我們做了什麼事情,為什麼要flush。

    public void flush() throws IOException, ClassNotFoundException,

           InterruptedException {

      LOG.info("Starting flush of map output");

      spillLock.lock();

      try {

        while (spillInProgress) {

          reporter.progress();

          spillDone.await();

//這裡檢視spillInProgress狀態,如果有spill就等待完成,並且報告狀態。

        }

        checkSpillException();

 

        final int kvbend = 4 * kvend;

//kvend是元資料塊的終點,元資料是向下伸展的。

//kvend是以整數計的陣列下標,kvbend是以位元組計的陣列下標

        if ((kvbend + METASIZE) % kvbuffer.length !=

            equator - (equator % METASIZE)) {

//這個條件說明緩衝區中原來有資料,現在spill已經完成,需要釋放空間。 獲取更多大資料視訊資料請加QQ群:947967114

          // spill finished

//spill一次需要調整一些引數,以釋放空間,這個工作通過resetSpill完成

          resetSpill();

   private void resetSpill() {

      final int e = equator;

      bufstart = bufend = e;

      final int aligned = e - (e % METASIZE);

      // set start/end to point to first meta record

      // Cast one of the operands to long to avoid integer overflow

      kvstart = kvend = (int)

        (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;

      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +

        (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");

}

//這裡其實就是在調整各個引數的位置。比如原點位,kvstart等。

        }

        if (kvindex != kvend) {

//再來判斷緩衝區是否為空,如果不空表示不滿足spill條件(80%),但map處理完成沒有資料輸入。

          kvend = (kvindex + NMETA) % kvmeta.capacity();

          bufend = bufmark;

          LOG.info("Spilling map output");

          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

                   "; bufvoid = " + bufvoid);

          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

                   "); kvend = " + kvend + "(" + (kvend * 4) +

                   "); length = " + (distanceTo(kvend, kvstart,

                         kvmeta.capacity()) + 1) + "/" + maxRec);

          sortAndSpill();

//呼叫一次sortAndSpill過程。 獲取更多大資料視訊資料請加QQ群:947967114

        }

      } catch (InterruptedException e) {

        throw new IOException("Interrupted while waiting for the writer", e);

      } finally {

        spillLock.unlock();

      }

//至此所有資料都已經溢寫出去,緩衝區已空,所有資料都spill到檔案中

      assert !spillLock.isHeldByCurrentThread();

      // shut down spill thread and wait for it to exit. Since the preceding

      // ensures that it is finished with its work (and sortAndSpill did not

      // throw), we elect to use an interrupt instead of setting a flag.

      // Spilling simultaneously from this thread while the spill thread

      // finishes its work might be both a useful way to extend this and also

      // sufficient motivation for the latter approach.

      try {

        spillThread.interrupt();

//讓spill執行緒不在執行

        spillThread.join();

//結束spill執行緒

      } catch (InterruptedException e) {

        throw new IOException("Spill failed", e);

      }

      // release sort buffer before the merge

      kvbuffer = null;

      mergeParts();

//合併spill檔案

      Path outputPath = mapOutputFile.getOutputFile();

      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());

}

flush的目的,首先讓緩衝區的所有KV對資料都進入spill檔案,因為每次spill都會產生一個spill檔案,所有spill檔案可能不止一個,所以要把spill檔案合併到單個檔案中,分發給reduce。

所以如果有spill正在進行必須等待其完成,也可能沒有spill但是緩衝區非空,需要再一次sortAndSpill,總之要把緩衝區清空為止。所有資料都spill完成後就可以進行mergeParts了

程式碼結構:

Maptask.runNewMapper--->NewOutputCollector.close--->MapOutputBuffer.flush--->MapOutputBuffer.mergeParts

原始碼如下:

    private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {

      // get the approximate size of the final output/index files

      long finalOutFileSize = 0;

      long finalIndexFileSize = 0;

      final Path[] filename = new Path[numSpills];

//每次溢寫都會有一個檔案,所以陣列的大小是numSpills。 獲取更多大資料視訊資料請加QQ群:947967114

      final TaskAttemptID mapId = getTaskID();

      for(int i = 0; i < numSpills; i++) {

//統計所有這些檔案合併之後的大小

        filename[i] = mapOutputFile.getSpillFile(i);

//通過spill檔案的編號獲取到指定的spill檔案路徑

        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();//獲取檔案大小

      }

      if (numSpills == 1) {

//合併輸出有倆檔案一個是output/file.out,一個是output/file.out.index

        sameVolRename(filename[0],

mapOutputFile.getOutputFileForWriteInVolume(filename[0]));

//換個檔名,在原檔名上加個file.out

        if (indexCacheList.size() == 0) {

//索引塊快取indexCacheList已空

          sameVolRename(mapOutputFile.getSpillIndexFile(0),            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));//spillIndexFile改名。

        } else {

//索引塊快取indexCacheList中還有索引記錄,要寫到索引檔案

          indexCacheList.get(0).writeToFile(

  //寫入檔案

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

        }

        sortPhase.complete();

        return;

//如果只有一個spill合併已經完成。 獲取更多大資料視訊資料請加QQ群:947967114

      }

 

      // read in paged indices

      for (int i = indexCacheList.size(); i < numSpills; ++i) {

//如果spill檔案不止一個,需要合併

        Path indexFileName = mapOutputFile.getSpillIndexFile(i);

        indexCacheList.add(new SpillRecord(indexFileName, job));

//先把所有的SpillIndexFile收集在一起。

      }

 

      //make correction in the length to include the sequence file header

      //lengths for each partition

      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;

//每個partition都有header

      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

//IndexFile,每個partition一個記錄。

      Path finalOutputFile =

         mapOutputFile.getOutputFileForWrite(finalOutFileSize);

      Path finalIndexFile =

          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

 

      //The output stream for the final single output file

      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

//建立合併,最終輸出。

      if (numSpills == 0) {

//要是沒有SipillFile生成,也建立一個空檔案

        //create dummy files

        IndexRecord rec = new IndexRecord();

//建立索引記錄

        SpillRecord sr = new SpillRecord(partitions);

//建立spill記錄

        try {

          for (int i = 0; i < partitions; i++) {

            long segmentStart = finalOut.getPos();

            FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

            Writer<K, V> writer =

              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);

            writer.close();

//建立後馬上關閉,形成空檔案。

            rec.startOffset = segmentStart;

            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

            sr.putIndex(rec, i);

          }

          sr.writeToFile(finalIndexFile, job);

//所以記錄寫入索引檔案

        } finally {

          finalOut.close();

        }

        sortPhase.complete();

        return;

      }

      {

        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

 

        IndexRecord rec = new IndexRecord();

        final SpillRecord spillRec = new SpillRecord(partitions);

        for (int parts = 0; parts < partitions; parts++) {

//finalOut最終輸出檔案。迴圈分割槽獲得所有spill檔案的該分割槽資料,合併寫入finalOut

          //create the segments to be merged

          List<Segment<K,V>> segmentList =

            new ArrayList<Segment<K, V>>(numSpills);

//建立Segment,資料段

          for(int i = 0; i < numSpills; i++) {

//準備合併所有的Spill檔案

            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =

              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,

                               indexRecord.partLength, codec, true);

            segmentList.add(i, s);

//把每個Spill檔案中相同partition的區段位置收集起來。 獲取更多大資料視訊資料請加QQ群:947967114

            if (LOG.isDebugEnabled()) {

              LOG.debug("MapId=" + mapId + " Reducer=" + parts +

                  "Spill =" + i + "(" + indexRecord.startOffset + "," +

                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");

            }

          }

 

          int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);

//做merge操作時同時操作的stream數上限

          boolean sortSegments = segmentList.size() > mergeFactor;

          //對segment進行排序

          @SuppressWarnings("unchecked")

          RawKeyValueIterator kvIter = Merger.merge(job, rfs,

                         keyClass, valClass, codec,

                         segmentList, mergeFactor,

                         new Path(mapId.toString()),

                         job.getOutputKeyComparator(), reporter, sortSegments,

                         null, spilledRecordsCounter, sortPhase.phase(),

                         TaskType.MAP);

//合併同一partition在所有spill檔案中的內容,可能還需要sort,合併後的結構是一個序列。

          //write merged output to disk

          long segmentStart = finalOut.getPos();

          FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);

          Writer<K, V> writer =

              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,

                               spilledRecordsCounter);

          if (combinerRunner == null || numSpills < minSpillsForCombine) { // minSpillsForCombine在MapOutputBuffer建構函式內被初始化,numSpills 為mapTask已經溢寫到磁碟spill檔案數量

            Merger.writeFile(kvIter, writer, reporter, job);

//將合併後的結果直接寫入檔案。下面看一下writeFile的原始碼;

  public static <K extends Object, V extends Object>

  void writeFile(RawKeyValueIterator records, Writer<K, V> writer,

                 Progressable progressable, Configuration conf)

  throws IOException {

    long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS,

        10000);

    long recordCtr = 0;

    while(records.next()) {

      writer.append(records.getKey(), records.getValue());

//追加的方式輸出到writer中

      if (((recordCtr++) % progressBar) == 0) {

        progressable.progress();

      }

}

回到主程式碼:

          } else {

//有combiner

            combineCollector.setWriter(writer);

//就插入combiner環節

            combinerRunner.combine(kvIter, combineCollector);

//將合併的結果經過combiner後寫入檔案

          }

 

          //close

          writer.close();//關閉writer通道

          sortPhase.startNextPhase();

 

          // record offsets

          rec.startOffset = segmentStart;

//從當前段的起點開始

          rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);

          rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);

          spillRec.putIndex(rec, parts);

        }

        spillRec.writeToFile(finalIndexFile, job);

//把spillFile寫入合併的indexFle

        finalOut.close();

//關閉最終輸出流

        for(int i = 0; i < numSpills; i++) {

          rfs.delete(filename[i],true);

//刪除所有spill檔案

        }

      }

}

該方法會將所有臨時檔案合併成一個大檔案儲存到output/file.out中,同時生成相應的索引檔案output/file.out.index。 在進行檔案合併的過程中,Map Task以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式:每輪合併io.sort.factor,預設是100,個檔案,並將產生的文 件重新加入待合併列表中,對檔案排序後,重複上述過程,直到只有一個檔案。只生產一個檔案可以避免同時開啟大量的檔案和同時讀取大量的小檔案產生的隨機讀 取帶來的開銷。最後會刪除所有的spill檔案。

  另外需要注意的是,mergeParts()中也有combiner的操作,但是需要滿足一定的條件:1、使用者設定了combiner;2、spill檔案的數量超過了minSpillsForCombine的值,對應配置項"min.num.spills.for.combine",可自行設定,預設是3。這倆必須同時具備才會在此啟動combiner的本地聚集操作。所以在Map階段有可能combiner會執行兩次,所以有可能你的combiner執行兩次之後輸出資料不符合預期了。

  這樣Map階段的任務就算完成了。主要是讀取資料然後寫入記憶體緩衝區,快取區滿足條件就會快排後並設定partition後,spill到本地檔案和索引檔案;如果有combiner,spill之前也會做一次聚集操作,待資料跑完會通過歸併合併所有spill檔案和索引檔案,如果有combiner,合併之前在滿足條件後會做一次綜合的聚集操作。map階段的結果都會儲存在本地中(如果有reducer的話),非HDFS。

Mapper完成對所有輸入檔案的處理,並將緩衝區的資料寫出到spill檔案之後,spill檔案的存在只有三種可能:沒有spill,一個spill,多個spill。針對這三種都需要一個最終的輸出檔案,不管內容有沒有,內容多少。這個最終檔案是和單個spill檔案是一樣的,按照partition分成若干段,然後是排好序的KV資料,這個merge操作結合之前的spill檔案進行sort。就構成了一次mergeSort,這個mergeSort只針對同一個Mapper的多個spill檔案,以後在Reducer那裡還會有Merge針對不同的Mapper檔案。

當Maptask完成後,從runNewMapper返回,下一個操作就是done。也就是MapTask的收尾工作。MapTask的收尾涉及到怎麼把生成的資料輸出交給ReduceTask。MapTask和ReduceTask都是擴充套件自Task。但是他們都沒有自己定義done函式,所以他們都呼叫了Task的done。

程式在這裡跳出runNewMapper     獲取更多大資料視訊資料請加QQ群:947967114

    if (useNewApi) {

      runNewMapper(job, splitMetaInfo, umbilical, reporter);

    } else {

      runOldMapper(job, splitMetaInfo, umbilical, reporter);

    }

    done(umbilical, reporter);

這個done我們點進去後發現是Task.done,原始碼如下;

 

  public void done(TaskUmbilicalProtocol umbilical,

                   TaskReporter reporter

                   ) throws IOException, InterruptedException {

    LOG.info("Task:" + taskId + " is done."

             + " And is in the process of committing");

    updateCounters();

//更新容器

    boolean commitRequired = isCommitRequired();

    if (commitRequired) {

      int retries = MAX_RETRIES;

      setState(TaskStatus.State.COMMIT_PENDING);

      // say the task tracker that task is commit pending

      while (true) {

        try {

          umbilical.commitPending(taskId, taskStatus);

          break;

//如果commitPending沒有發生異常,就退出,否則重試。

        } catch (InterruptedException ie) {

          // ignore

        } catch (IOException ie) {

          LOG.warn("Failure sending commit pending: " +

                    StringUtils.stringifyException(ie));

          if (--retries == 0) {

            System.exit(67);

          }

        }

      }

      //wait for commit approval and commit

      commit(umbilical, reporter, committer);

    }

    taskDone.set(true);

    reporter.stopCommunicationThread();

    // Make sure we send at least one set of counter increments. It's

    // ok to call updateCounters() in this thread after comm thread stopped.

    updateCounters();

    sendLastUpdate(umbilical);

    //signal the tasktracker that we are done

sendDone(umbilical);

實現sendDone的原始碼:

  private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {

    int retries = MAX_RETRIES;

    while (true) {

      try {

        umbilical.done(getTaskID());

//實際上這裡向MRAppMaster上的TaskAttemptImpl傳送TA_DONE事件

        LOG.info("Task '" + taskId + "' done.");

        return;

      } catch (IOException ie) {

        LOG.warn("Failure signalling completion: " +

                 StringUtils.stringifyException(ie));

        if (--retries == 0) {

          throw ie;

        }

      }

    }

  }

        umbilical.done(getTaskID()); 獲取更多大資料視訊資料請加QQ群:947967114

//實際上這裡向MRAppMaster上的TaskAttemptImpl傳送TA_DONE事件,在TA_DONE事件的驅動下,相應的TaskAttemptImpl物件的狀態機執行CleanupContainerTransition.transition,然後轉入SUCCESS_CONTAINER_CLEANUP狀態。注意這裡有一個TaskAttemptEventType.TA_DONE事件是由具體的MapTask所在節點上發出的,但不是引起的狀態機的跳變是在MRAppMaster節點上。對於Maptask,會有一個umbilical,就代表著MRAppMaster。

MPAppmaster接到CONTAINER_REMOTE_CLEANUP事件,ContainerLauncher通過RPC機制呼叫Maptask所在節點的ContainerManagerImpl.stopContainers.使這個MapTask的容器進入KILLED_BY_APPMASTER狀態從而不在活躍。操作成功後向相應的TaskAttemptImpl傳送TO_CONTAINER_CLEANED事件。如果一次TaskAttempt成功了,就意味著嘗試的任務也成功了,所以TaskAttempt的狀態關係到TaskImpl物件,taskImpl的掃描和善後,包括向上層的JobImpl物件傳送TaskState.SUCCESSED事件。向自身TaskImpl傳送的SUCCESSED事件會導致TaskImpl.handleTaskAttemptCompletion操作。

Mapper節點上產生一個過程setMapOutputServerAdress函式,把本節點的MapOutputServer地址設定成一個Web地址,意味著MapTask留下的資料輸出(合併後的spill檔案)可以通過HTTP連接獲取。至此Mapper的所有過程完成。 獲取更多大資料視訊資料請加QQ群:947967114