1. 程式人生 > >HBase原始碼分析之HRegion上compact流程分析(二)

HBase原始碼分析之HRegion上compact流程分析(二)

 

2016年03月03日 21:38:04 辰辰爸的技術部落格 閱讀數:2767

版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/lipeng_bigdata/article/details/50791205

        繼《HBase原始碼分析之HRegion上compact流程分析(一)》一文後,我們繼續HRegion上compact流程分析,接下來要講的是針對表中某個列簇下檔案的合併,即HStore的compact()方法,程式碼如下:

 

 
  1. /**

  2. * Compact the StoreFiles. This method may take some time, so the calling

  3. * thread must be able to block for long periods.

  4. *

  5. * 合併儲存檔案。該方法可能花費一些時間,

  6. *

  7. * <p>During this time, the Store can work as usual, getting values from

  8. * StoreFiles and writing new StoreFiles from the memstore.

  9. * 在此期間,Store仍能像往常一樣工作,從StoreFiles獲取資料和從memstore寫入新的StoreFiles

  10. *

  11. * Existing StoreFiles are not destroyed until the new compacted StoreFile is

  12. * completely written-out to disk.

  13. *

  14. * <p>The compactLock prevents multiple simultaneous compactions.

  15. * The structureLock prevents us from interfering with other write operations.

  16. *

  17. * <p>We don't want to hold the structureLock for the whole time, as a compact()

  18. * can be lengthy and we want to allow cache-flushes during this period.

  19. *

  20. * <p> Compaction event should be idempotent, since there is no IO Fencing for

  21. * the region directory in hdfs. A region server might still try to complete the

  22. * compaction after it lost the region. That is why the following events are carefully

  23. * ordered for a compaction:

  24. * 1. Compaction writes new files under region/.tmp directory (compaction output)

  25. * 2. Compaction atomically moves the temporary file under region directory

  26. * 3. Compaction appends a WAL edit containing the compaction input and output files.

  27. * Forces sync on WAL.

  28. * 4. Compaction deletes the input files from the region directory.

  29. *

  30. * Failure conditions are handled like this:

  31. * - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes

  32. * the compaction later, it will only write the new data file to the region directory.

  33. * Since we already have this data, this will be idempotent but we will have a redundant

  34. * copy of the data.

  35. * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The

  36. * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.

  37. * - If RS fails after 3, the region region server who opens the region will pick up the

  38. * the compaction marker from the WAL and replay it by removing the compaction input files.

  39. * Failed RS can also attempt to delete those files, but the operation will be idempotent

  40. *

  41. * See HBASE-2231 for details.

  42. *

  43. * @param compaction compaction details obtained from requestCompaction()

  44. * @throws IOException

  45. * @return Storefile we compacted into or null if we failed or opted out early.

  46. */

  47. @Override

  48. public List<StoreFile> compact(CompactionContext compaction) throws IOException {

  49. assert compaction != null;

  50. List<StoreFile> sfs = null;

  51.  
  52. // 從合併上下文CompactionContext中獲得合併請求CompactionRequest,即cr

  53. CompactionRequest cr = compaction.getRequest();;

  54.  
  55. try {

  56. // Do all sanity checking in here if we have a valid CompactionRequest

  57. // because we need to clean up after it on the way out in a finally

  58. // block below

  59. //

  60.  
  61. // 獲取compact開始時間compactionStartTime

  62. long compactionStartTime = EnvironmentEdgeManager.currentTime();

  63.  
  64. // 確保合併請求request不為空,實際上getRequest已經判斷並確保request不為空了,這裡為什麼還要再做判斷和保證呢?先留個小小的疑問吧!

  65. assert compaction.hasSelection();

  66.  
  67. // 從合併請求cr中獲得需要合併的檔案集合filesToCompact,集合中儲存的都是儲存檔案StoreFile的例項

  68. // 這個檔案集合是在構造CompactionRequest請求,或者合併其他請求時,根據傳入的引數或者其他請求中附帶的檔案集合來確定的,

  69. // 即請求一旦生成,需要合併的檔案集合filesToCompact就會存在

  70. Collection<StoreFile> filesToCompact = cr.getFiles();

  71.  
  72. // 確保需要合併的檔案集合filesToCompact不為空

  73. assert !filesToCompact.isEmpty();

  74.  
  75. // 確保filesCompacting中包含所有的待合併檔案filesToCompact

  76. synchronized (filesCompacting) {

  77. // sanity check: we're compacting files that this store knows about

  78. // TODO: change this to LOG.error() after more debugging

  79. Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));

  80. }

  81.  
  82. // Ready to go. Have list of files to compact.

  83. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "

  84. + this + " of " + this.getRegionInfo().getRegionNameAsString()

  85. + " into tmpdir=" + fs.getTempDir() + ", totalSize="

  86. + StringUtils.humanReadableInt(cr.getSize()));

  87.  
  88. // Commence the compaction.

  89. // 開始合併,呼叫CompactionContext的compact()方法,獲得合併後的新檔案newFiles

  90. List<Path> newFiles = compaction.compact();

  91.  
  92. // TODO: get rid of this!

  93. // 根據引數hbase.hstore.compaction.complete確實是否要完整的完成compact

  94. // 這裡有意思,這麼處理意味著,新舊檔案同時存在,新檔案沒有被挪到指定位置且新檔案的Reader被關閉,對外提供服務的還是舊檔案,啥目的呢?快速應用於讀?

  95. if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {

  96. LOG.warn("hbase.hstore.compaction.complete is set to false");

  97.  
  98. // 建立StoreFile列表sfs,大小為newFiles的大小

  99. sfs = new ArrayList<StoreFile>(newFiles.size());

  100.  
  101. // 遍歷新產生的合併後的檔案newFiles,針對每個檔案建立StoreFile和Reader,關閉StoreFile上的Reader,

  102. // 並將建立的StoreFile新增至列表sfs

  103. for (Path newFile : newFiles) {

  104. // Create storefile around what we wrote with a reader on it.

  105. StoreFile sf = createStoreFileAndReader(newFile);

  106.  
  107. // 關閉其上的Reader

  108. sf.closeReader(true);

  109. sfs.add(sf);

  110. }

  111.  
  112. // 返回合併後的檔案

  113. return sfs;

  114. }

  115.  
  116. // Do the steps necessary to complete the compaction.

  117. // 執行必要的步驟以完成這個合併

  118.  
  119. // 移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs

  120. sfs = moveCompatedFilesIntoPlace(cr, newFiles);

  121.  
  122. // 在WAL中寫入Compaction記錄

  123. writeCompactionWalRecord(filesToCompact, sfs);

  124.  
  125. // 替換StoreFiles:

  126. // 1、去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,

  127. // storefiles為Store中目前全部提供服務的儲存檔案列表;

  128. // 2、正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact;

  129. replaceStoreFiles(filesToCompact, sfs);

  130.  
  131.  
  132. // 根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控

  133. if (cr.isMajor()) {// 如果是Major合併

  134.  
  135. // 計數器累加,包括條數和大小

  136. majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;

  137. majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;

  138. } else {// 如果不是Major合併

  139.  
  140. // 計數器累加,包括條數和大小

  141. compactedCellsCount += getCompactionProgress().totalCompactingKVs;

  142. compactedCellsSize += getCompactionProgress().totalCompactedSize;

  143. }

  144.  
  145. // At this point the store will use new files for all new scanners.

  146. // 至此,store將會為所有新的scanners使用新的檔案

  147. // 完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小

  148. completeCompaction(filesToCompact, true); // Archive old files & update store size.

  149.  
  150. // 記錄日誌資訊

  151. logCompactionEndMessage(cr, sfs, compactionStartTime);

  152.  
  153. // 返回StoreFile列表sfs

  154. return sfs;

  155. } finally {

  156.  
  157. // 完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案

  158. finishCompactionRequest(cr);

  159. }

  160. }

        下面,我們來概述下整個流程:

 

        1、首先,從合併上下文CompactionContext中獲得合併請求CompactionRequest,即cr;

        2、獲取compact開始時間compactionStartTime;

        3、確保合併請求request不為空:

              實際上getRequest已經判斷並確保request不為空了,這裡為什麼還要再做判斷和保證呢?先留個小小的疑問吧!

        4、從合併請求cr中獲得需要合併的檔案集合filesToCompact:

              集合中儲存的都是儲存檔案StoreFile的例項,這個檔案集合是在構造CompactionRequest請求,或者合併其他請求時,根據傳入的引數或者其他請求中附帶的檔案集合來確定的,即請求一旦生成,需要合併的檔案集合filesToCompact就會存在。

        5、確保需要合併的檔案集合filesToCompact不為空;

        6、確保filesCompacting中包含所有的待合併檔案filesToCompact:

              那麼這個filesCompacting中的檔案是何時新增的呢?

        7、開始合併,呼叫CompactionContext的compact()方法,獲得合併後的新檔案newFiles:

              這一步是核心流程,它會持有通過scanner訪問待合併檔案,然後將資料全部寫入新檔案,後續文章會著重分析。

        8、根據引數hbase.hstore.compaction.complete確實是否要完整的完成compact,預設為true:

               8.1、如果配置的是false,則:

                        8.1.1、建立StoreFile列表sfs,大小為newFiles的大小;

                        8.1.2、遍歷新產生的合併後的檔案newFiles,針對每個檔案建立StoreFile和Reader,關閉StoreFile上的Reader,並將建立的StoreFile新增至列表sfs;

                        8.1.3、返回合併後的檔案列表sfs;

               8.2、如果配置的是true,則:

                        8.2.1、移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs;

                        8.2.2、在WAL中寫入Compaction記錄;

                        8.2.3、替換StoreFiles:包括去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表,還有正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact。

                        8.2.4、根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控;

                        8.2.5、完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小;

                        8.2.6、記錄日誌資訊;

                        8.2.7、完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案;

                        8.2.8、返回StoreFile列表sfs。

        至此,整個流程詳述完畢。接下來,我們針對其中的部分細節,再做詳細描述。

        首先,真正執行合併的CompactionContext的compact()方法我們暫時不講,只需要知道它會持有通過scanner訪問待合併檔案,然後將資料全部寫入新檔案,並得到這些新檔案的集合newFiles即可,我們會在後續文章詳細介紹。

        接下來,在獲得合併後的新檔案newFiles之後,我們會根據一個引數來確定後續處理流程,這個引數就是hbase.hstore.compaction.complete,由它來確定是否完整的結束一次合併操作,這完整與非完整的主要區別,或者說實質性區別就是:由誰來繼續對外提供資料讀取服務。

        先來看下非完整性結束,它會為合併後的每個檔案建立StoreFile和Reader例項,同時關閉新檔案上的Reader,也就意味著扔繼續由舊檔案提供資料讀取服務,而新檔案與舊檔案同時存在,舊檔案位置不變,涉及到列簇CF下的目前所有可用storefiles列表不變,儲存的仍是舊檔案的StoreFile物件;

        而對於完整性結束來說,它會移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs,然後在WAL中寫入Compaction記錄,並替換掉storefiles,根據合併的型別,針對不同的計數器做累加,方便系統性能指標監控,歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小,完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案等等,很多複雜的操作。不要著急,我們就其中複雜的地方,一個個的解釋:

        1、移動已完成檔案至正確的地方,建立StoreFile和Reader,返回StoreFile列表sfs

        這個是通過moveCompatedFilesIntoPlace()方法實現的,程式碼如下:

 

 
  1. private List<StoreFile> moveCompatedFilesIntoPlace(

  2. CompactionRequest cr, List<Path> newFiles) throws IOException {

  3.  
  4. // 建立StoreFile列表sfs

  5. List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());

  6.  
  7. // 遍歷newFiles

  8. for (Path newFile : newFiles) {

  9. assert newFile != null;

  10.  
  11. // 將新檔案newFile挪至正確地點,並建立StoreFile和Reader

  12. StoreFile sf = moveFileIntoPlace(newFile);

  13. if (this.getCoprocessorHost() != null) {

  14. this.getCoprocessorHost().postCompact(this, sf, cr);

  15. }

  16. assert sf != null;

  17. sfs.add(sf);

  18. }

  19. return sfs;

  20. }

        首先呢,建立StoreFile列表sfs,遍歷合併後的檔案newFiles,將新檔案newFile挪至正確地點,並建立StoreFile和Reader。而檔案位置改變,則是通過moveFileIntoPlace()方法實現的,它的程式碼如下:

 

 

 
  1. // Package-visible for tests

  2. StoreFile moveFileIntoPlace(final Path newFile) throws IOException {

  3.  
  4. // 檢測新檔案

  5. validateStoreFile(newFile);

  6.  
  7. // Move the file into the right spot

  8. // 移動檔案至正確的地點

  9. Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);

  10.  
  11. // 建立StoreFile和Reader

  12. return createStoreFileAndReader(destPath);

  13. }

        我們發現,移動檔案實際上是通過HStore的成員變數fs的commitStoreFile()方法來完成的。這個fs是HRegionFileSystem型別的變數,HRegionFileSystem是HRegion上檔案系統的一個抽象,它實現了各種檔案等的實際物理操作。我們來看下它的commitStoreFile()方法:

 

 

 
  1. /**

  2. * Move the file from a build/temp location to the main family store directory.

  3. * @param familyName Family that will gain the file

  4. * @param buildPath {@link Path} to the file to commit.

  5. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)

  6. * @param generateNewName False if you want to keep the buildPath name

  7. * @return The new {@link Path} of the committed file

  8. * @throws IOException

  9. */

  10. private Path commitStoreFile(final String familyName, final Path buildPath,

  11. final long seqNum, final boolean generateNewName) throws IOException {

  12.  
  13. // 根據列簇名familyName獲取儲存路徑storeDir

  14. Path storeDir = getStoreDir(familyName);

  15.  
  16. // 如果在檔案系統fs中不存在路徑的情況下建立它時失敗則丟擲異常

  17. if(!fs.exists(storeDir) && !createDir(storeDir))

  18. throw new IOException("Failed creating " + storeDir);

  19.  
  20. String name = buildPath.getName();

  21. if (generateNewName) {

  22. name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");

  23. }

  24. Path dstPath = new Path(storeDir, name);

  25. if (!fs.exists(buildPath)) {

  26. throw new FileNotFoundException(buildPath.toString());

  27. }

  28. LOG.debug("Committing store file " + buildPath + " as " + dstPath);

  29. // buildPath exists, therefore not doing an exists() check.

  30. if (!rename(buildPath, dstPath)) {

  31. throw new IOException("Failed rename of " + buildPath + " to " + dstPath);

  32. }

  33. return dstPath;

  34. }

        非常簡單,根據列簇名familyName獲取儲存路徑storeDir,檢測並在必要時建立storeDir,根據buildPath來獲取檔名name,然後利用storeDir和name來構造目標路徑storeDir,通過rename()方法實現檔案從buildPath至dstPath的移動即可。

 

        而建立StoreFile和Reader的方法最終呼叫的是createStoreFileAndReader()方法,程式碼如下:

 

 
  1. private StoreFile createStoreFileAndReader(final StoreFileInfo info)

  2. throws IOException {

  3. info.setRegionCoprocessorHost(this.region.getCoprocessorHost());

  4. StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,

  5. this.family.getBloomFilterType());

  6. storeFile.createReader();

  7. return storeFile;

  8. }

        StoreFile是一個儲存資料檔案。Stores通常含有一個或多個StoreFile,而Reader是其內部類,由Reader來提供檔案資料的讀取服務。

 

        2、在WAL中寫入Compaction記錄

        這個過程是通過writeCompactionWalRecord()方法來完成的,程式碼如下:

 

 
  1. /**

  2. * Writes the compaction WAL record.

  3. * 在WAL中寫入合併記錄

  4. *

  5. * @param filesCompacted Files compacted (input).

  6. * @param newFiles Files from compaction.

  7. */

  8. private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,

  9. Collection<StoreFile> newFiles) throws IOException {

  10.  
  11. // 如果region中的WAL為空,則直接返回

  12. if (region.getWAL() == null) return;

  13.  
  14. // 將被合併的檔案路徑新增至inputPaths列表

  15. List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());

  16. for (StoreFile f : filesCompacted) {

  17. inputPaths.add(f.getPath());

  18. }

  19.  
  20. // 將合併後的檔案路徑新增至inputPaths列表

  21. List<Path> outputPaths = new ArrayList<Path>(newFiles.size());

  22. for (StoreFile f : newFiles) {

  23. outputPaths.add(f.getPath());

  24. }

  25.  
  26. // 獲取HRegionInfo,即info

  27. HRegionInfo info = this.region.getRegionInfo();

  28.  
  29. // 構造compaction的描述資訊CompactionDescriptor

  30. CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,

  31. family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));

  32.  
  33. // 利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記

  34. WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),

  35. this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());

  36. }

        邏輯比較簡單:

 

        1、將被合併的檔案路徑新增至inputPaths列表;

        2、將合併後的檔案路徑新增至outputPaths列表;

        3、獲取HRegionInfo,即info;

        4、構造compaction的描述資訊CompactionDescriptor;

        5、利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記。

        首先說下這個compaction的描述資訊CompactionDescriptor,其中包含了表名TableName、Region名EncodedRegionName、列簇名FamilyName、儲存Home路徑StoreHomeDir、合併的輸入CompactionInput、合併的輸出CompactionOutput等關鍵資訊,完整的描述了合併的全部詳細資訊。其構造程式碼如下:

 

 
  1. public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,

  2. List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {

  3. // compaction descriptor contains relative paths.

  4. // input / output paths are relative to the store dir

  5. // store dir is relative to region dir

  6. CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()

  7. .setTableName(ByteStringer.wrap(info.getTableName()))

  8. .setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes()))

  9. .setFamilyName(ByteStringer.wrap(family))

  10. .setStoreHomeDir(storeDir.getName()); //make relative

  11. for (Path inputPath : inputPaths) {

  12. builder.addCompactionInput(inputPath.getName()); //relative path

  13. }

  14. for (Path outputPath : outputPaths) {

  15. builder.addCompactionOutput(outputPath.getName());

  16. }

  17. builder.setRegionName(ByteStringer.wrap(info.getRegionName()));

  18. return builder.build();

  19. }

       最後,利用WALUtil工具類的writeCompactionMarker()方法,在WAL中寫入一個合併標記,我們來看下程式碼:

 

 

 
  1. /**

  2. * Write the marker that a compaction has succeeded and is about to be committed.

  3. * This provides info to the HMaster to allow it to recover the compaction if

  4. * this regionserver dies in the middle (This part is not yet implemented). It also prevents

  5. * the compaction from finishing if this regionserver has already lost its lease on the log.

  6. * @param sequenceId Used by WAL to get sequence Id for the waledit.

  7. */

  8. public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,

  9. final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {

  10.  
  11. // 從合併資訊CompactionDescriptor中獲取表名tn

  12. TableName tn = TableName.valueOf(c.getTableName().toByteArray());

  13.  
  14. // we use HLogKey here instead of WALKey directly to support legacy coprocessors.

  15.  
  16. // 根據region的名字、表明tn,建立一個WALKey

  17. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);

  18.  
  19. // WAL中新增一條記錄,包括表的描述資訊HTableDescriptor、WALKey、Compaction資訊WALEdit、序列號sequenceId

  20. // Compaction資訊WALEdit是根據WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor獲取的

  21. //

  22. log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);

  23.  
  24. // 同步日誌

  25. log.sync();

  26. if (LOG.isTraceEnabled()) {

  27. LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

  28. }

  29. }

        它實際上在WAL中append了一條記錄,包括表的描述資訊HTableDescriptor、WALKey、Compaction資訊WALEdit、序列號sequenceId,而Compaction資訊WALEdit是根據WALEdit的createCompaction()方法,由HRegionInfo、CompactionDescriptor構造的。程式碼如下:

 

 

 
  1. /**

  2. * Create a compacion WALEdit

  3. * @param c

  4. * @return A WALEdit that has <code>c</code> serialized as its value

  5. */

  6. public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {

  7.  
  8. // 將CompactionDescriptor轉化成byte []

  9. byte [] pbbytes = c.toByteArray();

  10.  
  11. // 構造KeyValue,包括Region的startKey、“METAFAMILY”字串、

  12. // "HBASE::COMPACTION"字串、當前時間和合並描述CompactionDescriptor的二進位制形式

  13. KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,

  14. EnvironmentEdgeManager.currentTime(), pbbytes);

  15.  
  16. // 將KeyValue新增至WALEdit,並返回WALEdit例項

  17. return new WALEdit().add(kv); //replication scope null so that this won't be replicated

  18. }

        程式碼註釋比較詳細,不再贅述。

 

        3、替換StoreFiles,其中包括亮點:

             (1)去除掉所有的合併前,即已被合併的檔案compactedFiles,將合併後的檔案sfs加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表;

             (2)正在合併的檔案列表filesCompacting中去除被合併的檔案filesToCompact;

        具體程式碼replaceStoreFiles()方法如下:

 

 
  1. @VisibleForTesting

  2. void replaceStoreFiles(final Collection<StoreFile> compactedFiles,

  3. final Collection<StoreFile> result) throws IOException {

  4.  
  5. // 加鎖,上讀寫鎖ReentrantReadWriteLock的寫鎖,意味著這是一把互斥鎖

  6. this.lock.writeLock().lock();

  7. try {

  8. // 通過StoreFileManager的addCompactionResults()方法,將被合併的檔案

  9. // 去除掉所有的合併前,即已被合併的檔案compactedFiles

  10. // 將合併後的檔案加入到StoreFileManager的storefiles中去,storefiles為Store中目前全部提供服務的儲存檔案列表

  11. this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);

  12.  
  13. // 正在合併的檔案列表filesCompacting中去除被合併的檔案

  14. filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock();

  15. } finally {

  16. // 解鎖

  17. this.lock.writeLock().unlock();

  18. }

  19. }

        4、完成合並:歸檔舊檔案(在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下),關閉其上的Reader,並更新store大小。completeCompaction()程式碼如下:

 

 

 
  1. /**

  2. * <p>It works by processing a compaction that's been written to disk.

  3. *

  4. * <p>It is usually invoked at the end of a compaction, but might also be

  5. * invoked at HStore startup, if the prior execution died midway through.

  6. *

  7. * <p>Moving the compacted TreeMap into place means:

  8. * <pre>

  9. * 1) Unload all replaced StoreFile, close and collect list to delete.

  10. * 2) Compute new store size

  11. * </pre>

  12. *

  13. * @param compactedFiles list of files that were compacted

  14. */

  15. @VisibleForTesting

  16. protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)

  17. throws IOException {

  18. try {

  19. // Do not delete old store files until we have sent out notification of

  20. // change in case old files are still being accessed by outstanding scanners.

  21. // Don't do this under writeLock; see HBASE-4485 for a possible deadlock

  22. // scenario that could have happened if continue to hold the lock.

  23. // 通知Reader觀察者

  24. notifyChangedReadersObservers();

  25. // At this point the store will use new files for all scanners.

  26.  
  27. // let the archive util decide if we should archive or delete the files

  28. LOG.debug("Removing store files after compaction...");

  29.  
  30. // 遍歷已被合併的檔案completeCompaction,關閉其上的Reader

  31. for (StoreFile compactedFile : compactedFiles) {

  32. compactedFile.closeReader(true);

  33. }

  34.  
  35. // 在檔案系統中刪除已被合併的檔案compactedFiles,實際上是歸檔操作,將舊的檔案從原位置移到歸檔目錄下

  36. if (removeFiles) {

  37. this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);

  38. }

  39. } catch (IOException e) {

  40. e = RemoteExceptionHandler.checkIOException(e);

  41. LOG.error("Failed removing compacted files in " + this +

  42. ". Files we were trying to remove are " + compactedFiles.toString() +

  43. "; some of them may have been already removed", e);

  44. }

  45.  
  46. // 4. Compute new store size

  47. // 計算新的store大小

  48. this.storeSize = 0L;

  49. this.totalUncompressedBytes = 0L;

  50.  
  51. // 遍歷StoreFiles,計算storeSize、totalUncompressedBytes等大小

  52. for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {

  53. StoreFile.Reader r = hsf.getReader();

  54. if (r == null) {

  55. LOG.warn("StoreFile " + hsf + " has a null Reader");

  56. continue;

  57. }

  58. this.storeSize += r.length();

  59. this.totalUncompressedBytes += r.getTotalUncompressedBytes();

  60. }

  61. }

 

        其他程式碼註釋中都有,這裡,我們要單獨說下HRegionFileSystem的removeStoreFiles()方法,如下:

 
  1. /**

  2. * Closes and archives the specified store files from the specified family.

  3. * @param familyName Family that contains the store files

  4. * @param storeFiles set of store files to remove

  5. * @throws IOException if the archiving fails

  6. */

  7. public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles)

  8. throws IOException {

  9. HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs,

  10. this.tableDir, Bytes.toBytes(familyName), storeFiles);

  11. }

        它最終是通過HFileArchiver的archiveStoreFiles()方法來完成的,程式碼如下:

 

 

 
  1. /**

  2. * Remove the store files, either by archiving them or outright deletion

  3. * @param conf {@link Configuration} to examine to determine the archive directory

  4. * @param fs the filesystem where the store files live

  5. * @param regionInfo {@link HRegionInfo} of the region hosting the store files

  6. * @param family the family hosting the store files

  7. * @param compactedFiles files to be disposed of. No further reading of these files should be

  8. * attempted; otherwise likely to cause an {@link IOException}

  9. * @throws IOException if the files could not be correctly disposed.

  10. */

  11. public static void archiveStoreFiles(Configuration conf, FileSystem fs, HRegionInfo regionInfo,

  12. Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) throws IOException {

  13.  
  14. // sometimes in testing, we don't have rss, so we need to check for that

  15. if (fs == null) {

  16. LOG.warn("Passed filesystem is null, so just deleting the files without archiving for region:"

  17. + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family));

  18. deleteStoreFilesWithoutArchiving(compactedFiles);

  19. return;

  20. }

  21.  
  22. // short circuit if we don't have any files to delete

  23. // 判斷被合併檔案列表compactedFiles的大小,如果為0,立即返回

  24. if (compactedFiles.size() == 0) {

  25. LOG.debug("No store files to dispose, done!");

  26. return;

  27. }

  28.  
  29. // build the archive path

  30. if (regionInfo == null || family == null) throw new IOException(

  31. "Need to have a region and a family to archive from.");

  32.  
  33. // 獲取歸檔儲存路徑

  34. Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);

  35.  
  36. // make sure we don't archive if we can't and that the archive dir exists

  37. // 建立路徑

  38. if (!fs.mkdirs(storeArchiveDir)) {

  39. throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"

  40. + Bytes.toString(family) + ", deleting compacted files instead.");

  41. }

  42.  
  43. // otherwise we attempt to archive the store files

  44. if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");

  45.  
  46. // Wrap the storefile into a File

  47. StoreToFile getStorePath = new StoreToFile(fs);

  48. Collection<File> storeFiles = Collections2.transform(compactedFiles, getStorePath);

  49.  
  50. // do the actual archive

  51. // 通過resolveAndArchive()執行歸檔

  52. if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {

  53. throw new IOException("Failed to archive/delete all the files for region:"

  54. + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)

  55. + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");

  56. }

  57. }

        層層呼叫啊,接著來吧,繼續看關鍵程式碼:

 

 

 
  1. // 如果是檔案

  2. if (file.isFile()) {

  3. // attempt to archive the file

  4. if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {

  5. LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);

  6. failures.add(file);

  7. }

  8. }

        而這個resolveAndArchiveFile()方法不是簡單的刪除檔案,而是通過rename()方法將舊的儲存檔案挪至了歸檔路徑下,程式碼如下:

 

 

 
  1. // move the archive file to the stamped backup

  2. Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);

  3. if (!fs.rename(archiveFile, backedupArchiveFile)) {

  4. LOG.error("Could not rename archive file to backup: " + backedupArchiveFile

  5. + ", deleting existing file in favor of newer.");

  6. // try to delete the exisiting file, if we can't rename it

  7. if (!fs.delete(archiveFile, false)) {

  8. throw new IOException("Couldn't delete existing archive file (" + archiveFile

  9. + ") or rename it to the backup file (" + backedupArchiveFile

  10. + ") to make room for similarly named file.");

  11. }

  12. }

        5、完成Compaction請求:Region彙報合併請求至終端、filesCompacting中刪除請求中的所有待合併檔案

 

        這部分是由方法finishCompactionRequest()完成的,程式碼如下:

 

 
  1. private void finishCompactionRequest(CompactionRequest cr) {

  2. // Region彙報合併請求至終端

  3. this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());

  4.  
  5. //

  6. if (cr.isOffPeak()) {

  7. offPeakCompactionTracker.set(false);

  8. cr.setOffPeak(false);

  9. }

  10.  
  11. // filesCompacting中刪除請求中的所有待合併檔案

  12. synchronized (filesCompacting) {

  13. filesCompacting.removeAll(cr.getFiles());

  14. }

  15. }

        讀者可自行分析,不再贅述。

 

        好了,就先到這裡吧,且待下回分解!