1. 程式人生 > >HBase原始碼分析之HRegionServer上compact流程分析

HBase原始碼分析之HRegionServer上compact流程分析

        前面三篇文章中,我們詳細敘述了compact流程是如何在HRegion上進行的,瞭解了它的很多細節方面的問題。但是,這個compact在HRegionServer上是如何進行的?合併時檔案是如何選擇的呢?在這篇文章中,你將找到答案!

        首先,在HRegionServer內部,我們發現,它定義了一個CompactSplitThread型別的成員變數compactSplitThread,單看字面意思,這就是一個合併分裂執行緒,那麼它會不會就是HRegionServer上具體執行合併的工作執行緒呢?我們一步一步來看。

        要了解它是什麼,能夠做什麼,那麼就必須要看看它的實現,找到CompactSplitThread類,so,開始我們的分析之旅吧!

        首先,看下CompactSplitThread中都定義可哪些變數,如下:

// Configuration key for the large compaction threads.
  // large合併執行緒數引數hbase.regionserver.thread.compaction.large,預設值為1
  public final static String LARGE_COMPACTION_THREADS =
      "hbase.regionserver.thread.compaction.large";
  public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
  
  // Configuration key for the small compaction threads.
  // small合併執行緒數引數hbase.regionserver.thread.compaction.small,預設值為1
  public final static String SMALL_COMPACTION_THREADS =
      "hbase.regionserver.thread.compaction.small";
  public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
  
  // Configuration key for split threads
  // 分裂執行緒數引數hbase.regionserver.thread.split,預設值為1
  public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
  public final static int SPLIT_THREADS_DEFAULT = 1;
  
  // Configuration keys for merge threads
  // merge合併執行緒數引數hbase.regionserver.thread.merge,預設值為1
  public final static String MERGE_THREADS = "hbase.regionserver.thread.merge";
  public final static int MERGE_THREADS_DEFAULT = 1;

  // Region分分裂的限制
  public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
      "hbase.regionserver.regionSplitLimit";
  // Region分分裂的限制預設值,為1000
  public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
  
  // HRegionServer例項server
  private final HRegionServer server;
  
  // Configuration配置例項conf
  private final Configuration conf;

  // long合併執行緒池longCompactions
  private final ThreadPoolExecutor longCompactions;
  // short合併執行緒池shortCompactions
  private final ThreadPoolExecutor shortCompactions;
  // 分裂執行緒池splits
  private final ThreadPoolExecutor splits;
  // merge合併執行緒池mergePool
  private final ThreadPoolExecutor mergePool;

  /**
   * Splitting should not take place if the total number of regions exceed this.
   * 如果region的總數超過這個限制,split就不應該發生。
   * This is not a hard limit to the number of regions but it is a guideline to
   * stop splitting after number of online regions is greater than this.
   * 這不是一個硬性的Region數目的限制,但是如果線上region的數目超過此限制它會是一個停止split的指南。
   */
  private int regionSplitLimit;
        其中,關於Region的Spilt、Merge相關的成員變數我們暫時忽略,等到專門講解split、merge時再單獨介紹。這裡,先了解下CompactSplitThread中都有哪些關於compact的成員變數,大體可以分為三類:

        1、第一類是配置引數及其預設值相關的,涉及到large、small合併執行緒數引數和其預設值以及HBase整體配置變數Configuration型別的conf;

        2、第二類是執行緒池,包括long合併執行緒池longCompactions和short合併執行緒池shortCompactions,它們統一使用的Java中的ThreadPoolExecutor;

        3、第三類是CompactSplitThread的載體,或者說工作的環境,HRegionServer例項server。

        既然已經存在合併的執行緒池,那麼很簡單,將合併執行緒扔到執行緒池中等待排程就是了。那麼是由哪些方法來完成的這一步呢?答案就在requestCompaction()及requestSystemCompaction()系列方法,而這一系列的requestCompaction()和requestSystemCompaction()方法引數不同,也僅意味著應用場景不同而已,最終還是要落到requestCompactionInternal()方法上的。同時,需要強調一點,requestCompaction()方法和requestSystemCompaction()方法有一個顯著的區別,那就是在最終呼叫requestCompactionInternal()方法時,前者傳入的selectNow為true,而後者傳入的selectNow為false,這點需要特別注意下,下面也會講到。先撇開都哪些地方會呼叫requestCompaction()系列方法,也就是compact發起的時機、條件等,我們後續會分析,這裡我們先來看下requestCompactionInternal(),程式碼如下:

/**
   * @param r HRegion store belongs to
   * @param s Store to request compaction on
   * @param why Why compaction requested -- used in debug messages
   * @param priority override the default priority (NO_PRIORITY == decide)
   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
   *          compaction will be used.
   */
  private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
      final String why, int priority, CompactionRequest request, boolean selectNow)
          throws IOException {
    
	// 首選做一些必要的環境判斷,比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作
	if (this.server.isStopped()
        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
      return null;
    }

    CompactionContext compaction = null;
    // 系統自動觸發的system compaction,selectNow引數為false,如果是hbase shell等人為觸發的合併,則selectNow為true
    if (selectNow) {
      // 通過hbase shell觸發的major compaction,selectNow為true.這裡進行實際的選取待合併檔案操作
      compaction = selectCompaction(r, s, priority, request);
      if (compaction == null) return null; // message logged inside
    }

    // We assume that most compactions are small. So, put system compactions into small
    // pool; we will do selection there, and move to large pool if necessary.
    // 我們假設大部分合並都是small。所以,將系統引發的合併放進small pool,在那裡我們會做出選擇,如果有必要的話會挪至large pool
    
    // 也就是說,如果selectNow為false,即系統自身引發的合併,比如MemStore flush、compact檢查執行緒等,統一放入到shortCompactions中,即small pool
    // 而如果是人為觸發的,比如HBase shell,則還要看HStore中合併請求大小是否超過閾值,超過則放入longCompactions,即large pool,否則還是small pool
    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
      ? longCompactions : shortCompactions;
    
    // 將合併請求包裝成CompactionRunner,扔進執行緒池去執行
    pool.execute(new CompactionRunner(s, r, compaction, pool));
    
    // 記錄debug級別的LOG資訊
    if (LOG.isDebugEnabled()) {
      String type = (pool == shortCompactions) ? "Small " : "Large ";
      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
    }
    
    // 返回,如果是人為觸發的,返回合併請求,否則返回null
    return selectNow ? compaction.getRequest() : null;
  }
        直接說下大體流程吧!首先,需要做一些必要的檢查,比如比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作等,然後這裡有一個關鍵的地方,就是上述的selectNow,如果不是system compaction,selectNow為true,也就意味著它需要呼叫selectCompaction()方法,獲取CompactionContext,而這本質上就是要選取待合併檔案。我們先看下selectCompaction()方法,程式碼如下:
private CompactionContext selectCompaction(final HRegion r, final Store s,
      int priority, CompactionRequest request) throws IOException {
    
	// 呼叫HStore的requestCompaction()方法,獲取CompactionContext
	CompactionContext compaction = s.requestCompaction(priority, request);
    if (compaction == null) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting " + r.getRegionNameAsString() +
            " because compaction request was cancelled");
      }
      return null;
    }
    
    // 確保CompactionContext中合併請求request不為空
    assert compaction.hasSelection();
    
    // 設定priority
    if (priority != Store.NO_PRIORITY) {
      compaction.getRequest().setPriority(priority);
    }
    return compaction;
  }

        而這個方法最終還是呼叫HStore的requestCompaction()方法來獲取CompactionContext,繼續分析:

  @Override
  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
      throws IOException {
    
	// don't even select for compaction if writes are disabled
	// 如果對應HRegion不可寫,直接返回null
    if (!this.areWritesEnabled()) {
      return null;
    }

    // Before we do compaction, try to get rid of unneeded files to simplify things.
    // 在我們做合併之前,試著擺脫不必要的檔案來簡化事情
    removeUnneededFiles();

    // 通過儲存引擎storeEngine建立合併上下文CompactionContext
    CompactionContext compaction = storeEngine.createCompaction();
    CompactionRequest request = null;
    
    // 加讀鎖
    this.lock.readLock().lock();
    try {
      synchronized (filesCompacting) {
        // First, see if coprocessor would want to override selection.
    	// 如果存在協處理器
        if (this.getCoprocessorHost() != null) {
        	
          // 通過CompactionContext的preSelect()方法,選擇StoreFile,返回StoreFilel列表
          List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
          
          // 如果存在協處理器,且其preCompactSelection()方法返回true,使用CompactionContext的forceSelect()方法,進行覆蓋
          boolean override = this.getCoprocessorHost().preCompactSelection(
              this, candidatesForCoproc, baseRequest);
          if (override) {
            // Coprocessor is overriding normal file selection.
            compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
          }
        }

        // Normal case - coprocessor is not overriding file selection.
        if (!compaction.hasSelection()) {// 如果合併請求為空,即不存在協處理器
          
          // 是否為UserCompaction
          boolean isUserCompaction = priority == Store.PRIORITY_USER;
          boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
              offPeakCompactionTracker.compareAndSet(false, true);
          try {
        	// 呼叫CompactionContext的select()方法
            compaction.select(this.filesCompacting, isUserCompaction,
              mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
          } catch (IOException e) {
            if (mayUseOffPeak) {
              offPeakCompactionTracker.set(false);
            }
            throw e;
          }
          assert compaction.hasSelection();
          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
            // Compaction policy doesn't want to take advantage of off-peak.
            offPeakCompactionTracker.set(false);
          }
        }
        if (this.getCoprocessorHost() != null) {
          this.getCoprocessorHost().postCompactSelection(
              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
        }

        // Selected files; see if we have a compaction with some custom base request.
        if (baseRequest != null) {// 如果之前傳入的請求不為空,則合併之
          // Update the request with what the system thinks the request should be;
          // its up to the request if it wants to listen.
          compaction.forceSelect(
              baseRequest.combineWith(compaction.getRequest()));
        }
        
        // Finally, we have the resulting files list. Check if we have any files at all.
        // 獲取合併請求request
        request = compaction.getRequest();
        
        // 從合併請求request中獲取待合併檔案集合selectedFiles
        final Collection<StoreFile> selectedFiles = request.getFiles();
        if (selectedFiles.isEmpty()) {// 如果selectedFiles為空,直接返回null
          return null;
        }

        // 將選擇的檔案集合加入到filesCompacting中,解答了之前文章的疑問
        addToCompactingFiles(selectedFiles);

        // If we're enqueuing a major, clear the force flag.
        // 是否為major合併
        this.forceMajor = this.forceMajor && !request.isMajor();

        // Set common request properties.
        // Set priority, either override value supplied by caller or from store.
        // 設定優先順序
        request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
        // 設定描述資訊
        request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
      }
    } finally {
      // 解除讀鎖
      this.lock.readLock().unlock();
    }

    LOG.debug(getRegionInfo().getEncodedName() + " - "  + getColumnFamilyName()
        + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
        + (request.isAllFiles() ? " (all files)" : ""));
    
    // 呼叫HRegion的reportCompactionRequestStart()方法,彙報一個compact請求開始
    this.region.reportCompactionRequestStart(request.isMajor());
    
    // 返回合併上下文compaction
    return compaction;
  }
        這裡我們只敘述下主要過程,requestCompaction()方法的處理邏輯大體如下:

        1、如果對應HRegion不可寫,直接返回null;

        2、在我們做合併之前,試著擺脫不必要的檔案來簡化事情;

        3、通過儲存引擎storeEngine建立合併上下文CompactionContext型別的compaction;

        4、加讀鎖;

        5、如果存在協處理器:通過CompactionContext的preSelect()方法,選擇StoreFile,返回StoreFilel列表;

        6、如果合併請求為空,即不存在協處理器:呼叫CompactionContext的select()方法,初始化compaction中的合併請求requst;

        7、如果之前傳入的請求baseRequest不為空,則合併之;

        8、獲取合併請求request;

        9、從合併請求request中獲取待合併檔案集合selectedFiles;

        10、將選擇的檔案集合加入到filesCompacting中,解答了之前文章的疑問;

        11、設定標誌位forceMajor:是否為major合併;

        12、request中設定優先順序、設定描述資訊;

        13、解除讀鎖;

        14、呼叫HRegion的reportCompactionRequestStart()方法,彙報一個compact請求開始;

        15、返回合併上下文compaction。

        現在我們著重看下如何通過呼叫CompactionContext的select()方法初始化compaction中的合併請求requst,其他步驟比較簡單,在此不一一敘述了。

        現在我們就看下其預設實現類DefaultCompactionContext中的select()方法,程式碼如下:

@Override
    public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
      
      // 利用合併策略compactionPolicy的selectCompaction()方法,獲取合併請求request
      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
      
      // 返回是否得到request的標誌,true or false
      return request != null;
    }
        它是利用合併策略compactionPolicy的selectCompaction()方法,獲取合併請求request。那麼按照上面講的,我看下合併策略的一種實現RatioBasedCompactionPolicy的selectCompaction()方法實現,程式碼如下:
/**
   * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
   * @return subset copy of candidate list that meets compaction criteria
   * @throws java.io.IOException
   */
  public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    
	// Preliminary compaction subject to filters
	// 初步壓縮過濾器,即根據傳入的引數candidateFiles,建立一個候選的StoreFile列表
    // candidateFiles為通過storeFileManager.getStorefiles()方法獲取的Store下的全部儲存檔案
    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
    // able to compact more if stuck and compacting, because ratio policy excludes some
    // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
    
    // 確定futureFiles,如果filesCompacting為空則為0,否則為1
    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
        >= storeConfigInfo.getBlockingFileCount();
    
    // 從候選列表candidateSelection中排除正在合併的檔案,即filesCompacting中的檔案
    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
    
    LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
        filesCompacting.size() + " compacting, " + candidateSelection.size() +
        " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");

    // If we can't have all files, we cannot do major anyway
    // 驗證是否包含所有檔案,設定標誌位isAllFiles,判斷的條件就是此時的候選列表candidateSelection大小是否等於初始的candidateFiles列表大小,
    // 而candidateFiles代表了Store下的全部檔案
    boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
    
    // 如果沒有包含所有檔案,則不可能為一個Major合併
    if (!(forceMajor && isAllFiles)) {
      // 如果不是強制的Major合併,且不包含所有的檔案,則呼叫skipLargeFiles()方法,跳過較大檔案
      candidateSelection = skipLargeFiles(candidateSelection);
      
      // 再次確定標誌位isAllFiles
      isAllFiles = candidateFiles.size() == candidateSelection.size();
    }

    // Try a major compaction if this is a user-requested major compaction,
    // or if we do not have too many files to compact and this was requested as a major compaction
    // 確定isTryingMajor,共三種情況:
    // 1、強制Major合併,且包含所有問檔案,且是一個使用者合併
    // 2、強制Major合併,且包含所有問檔案,或者本身就是一個Major合併,同時,必須是candidateSelection的數目小於配置的達到合併條件的最大檔案數目
    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
        || (((forceMajor && isAllFiles) || isMajorCompaction(candidateSelection))
          && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
    // Or, if there are any references among the candidates.
    // candidates中存在引用的話,則視為是在分裂後的檔案
    boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
    
    // 如果不是TryingMajor,且不是在分裂後
    if (!isTryingMajor && !isAfterSplit) {
      // We're are not compacting all files, let's see what files are applicable
      // 再次篩選檔案
      candidateSelection = filterBulk(candidateSelection);// 取出不應該位於Minor合併的檔案
      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
      candidateSelection = checkMinFilesCriteria(candidateSelection);
    }
    
    // candidateSelection中移除過量的檔案
    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, isTryingMajor);
    
    // Now we have the final file list, so we can determine if we can do major/all files.
    // 檢視是否為全部檔案
    isAllFiles = (candidateFiles.size() == candidateSelection.size());
    
    // 利用candidateSelection構造合併請求CompactionRequest物件result
    CompactionRequest result = new CompactionRequest(candidateSelection);
    
    // 設定請求中的標誌位
    result.setOffPeak(!candidateSelection.isEmpty() && !isAllFiles && mayUseOffPeak);
    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
    
    // 返回合併請求CompactionRequest物件result
    return result;
  }
        我們撿重點的說,大體流程如下:

        1、根據傳入的引數candidateFiles,建立一個候選的StoreFile列表;

               candidateFiles為通過storeFileManager.getStorefiles()方法獲取的Store下的全部儲存檔案。

        2、確定futureFiles,如果filesCompacting為空則為0,否則為1;

        3、從候選列表candidateSelection中排除正在合併的檔案,即filesCompacting中的檔案;

        4、驗證是否包含所有檔案,設定標誌位isAllFiles,判斷的條件就是此時的候選列表candidateSelection大小是否等於初始的candidateFiles列表大小,而candidateFiles代表了Store下的全部檔案;

        5、如果不是強制的Major合併,且不包含所有的檔案,則呼叫skipLargeFiles()方法,跳過較大檔案,並再次確定標誌位isAllFiles;

        6、確定isTryingMajor,共兩種情況:

            (1)強制Major合併,且包含所有問檔案,且是一個使用者合併;

            (2)強制Major合併,且包含所有問檔案,或者本身就是一個Major合併,同時,必須是candidateSelection的數目小於配置的達到合併條件的最大檔案數目;

        7、candidates中存在引用的話,則視為是在分裂後的檔案,即isAfterSplit為true;

        8、如果不是TryingMajor,且不是在分裂後isAfterSplit,再次篩選檔案:

              8.1、通過filterBulk()方法取出不應該位於Minor合併的檔案;

              8.2、通過applyCompactionPolicy()方法,使用一定的演算法,進行檔案的篩選;

              8.3、通過checkMinFilesCriteria()方法,判斷是否滿足合併時最小檔案數的要求;

        9、通過removeExcessFiles()方法在candidateSelection中移除過量的檔案;

        10、檢視是否為全部檔案:再次確定標誌位isAllFiles;

        11、利用candidateSelection構造合併請求CompactionRequest物件result;

        12、設定請求中的標誌位;

        13、返回合併請求CompactionRequest物件result。

        我們主要分析下其中檔案篩選的一些方法。

        首先看跳過大檔案的skipLargeFiles()方法,程式碼如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * exclude all files above maxCompactSize
   * Also save all references. We MUST compact them
   */
  private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
    int pos = 0;
    while (pos < candidates.size() && !candidates.get(pos).isReference()
      && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
      // 最主要的一個判斷,列表指定位置的檔案大小是否超過閾值comConf.getMaxCompactSize()
      // 這個閾值優先取引數hbase.hstore.compaction.max.size,引數未配置的話取Long.MAX_VALUE
      ++pos;
    }
    if (pos > 0) {
      LOG.debug("Some files are too large. Excluding " + pos
          + " files from compaction candidates");
      // 由此可見candidates應該是一個以檔案大小倒序排序的列表
      candidates.subList(0, pos).clear();
    }
    return candidates;
  }
        它會遍歷檔案列表candidates,最主要的一個判斷,列表指定位置的檔案大小是否超過閾值comConf.getMaxCompactSize(),這個閾值優先取引數hbase.hstore.compaction.max.size,引數未配置的話取Long.MAX_VALUE。

        其次再看下取出不應該位於Minor合併的檔案的filterBulk()方法,程式碼如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * exclude all bulk load files if configured
   */
  private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
    
	// 去除掉不應該在Minor合併中的檔案:根據 StoreFile的標誌位excludeFromMinorCompaction判斷
	// 它的判斷是HFile資訊的元資料中存在EXCLUDE_FROM_MINOR_COMPACTION標誌
    candidates.removeAll(Collections2.filter(candidates,
        new Predicate<StoreFile>() {
          @Override
          public boolean apply(StoreFile input) {
            return input.excludeFromMinorCompaction();
          }
        }));
    return candidates;
  }
        它根據StoreFile的標誌位excludeFromMinorCompaction判斷,而excludeFromMinorCompaction為true是當HFile資訊的元資料中存在EXCLUDE_FROM_MINOR_COMPACTION標誌時設定的,說了這麼多,其實它就是要排除BulkLoad進入HBase的檔案!

        然後,我們再看下比較複雜的applyCompactionPolicy()方法,程式碼如下:

/**
    * @param candidates pre-filtrate
    * @return filtered subset
    * -- Default minor compaction selection algorithm:
    * choose CompactSelection from candidates --
    * First exclude bulk-load files if indicated in configuration.
    * Start at the oldest file and stop when you find the first file that
    * meets compaction criteria:
    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
    * OR
    * (2) within the compactRatio of sum(newer_files)
    * Given normal skew, any newer files will also meet this criteria
    * <p/>
    * Additional Note:
    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
    * compact().  Consider the oldest files first to avoid a
    * situation where we always compact [end-threshold,end).  Then, the
    * last file becomes an aggregate of the previous compactions.
    *
    * normal skew:
    *
    *         older ----> newer (increasing seqID)
    *     _
    *    | |   _
    *    | |  | |   _
    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
    *    | |  | |  | |  | |  _  | |
    *    | |  | |  | |  | | | | | |
    *    | |  | |  | |  | | | | | |
    */
  ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
      boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
    
	// 如果檔案列表為空,原樣返回  
	if (candidates.isEmpty()) {
      return candidates;
    }

    // we're doing a minor compaction, let's see what files are applicable
    int start = 0;
    
    // 獲取檔案合併比例:取引數hbase.hstore.compaction.ratio,預設為1.2
    double ratio = comConf.getCompactionRatio();
    if (mayUseOffPeak) {// 如果不是在峰值使用
      // 取引數hbase.hstore.compaction.ratio.offpeak,預設為5.0
      ratio = comConf.getCompactionRatioOffPeak();
      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
    }

    // get store file sizes for incremental compacting selection.
    
    // 待合併檔案數目countOfFiles
    final int countOfFiles = candidates.size();
    
    // 用於存放檔案大小的陣列fileSizes
    long[] fileSizes = new long[countOfFiles];
    
    // 用於存放該檔案之後在最大檔案數這個範圍內所有檔案(包含該檔案)大小合計的陣列sumSize
    long[] sumSize = new long[countOfFiles];
    
    // 倒序遍歷candidates檔案淚飆
    for (int i = countOfFiles - 1; i >= 0; --i) {
      StoreFile file = candidates.get(i);
      
      // 將檔案大小放入陣列fileSizes
      fileSizes[i] = file.getReader().length();
      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
      // tooFar表示後移動最大檔案數位置的檔案大小,其實也就是剛剛滿足達到最大檔案數位置的那個檔案,也就是說,從i至tooFar數目為合併時允許的最大檔案數
      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
      
      // 計算合計:該檔案大小fileSizes[i] + (截止到下一個檔案大小sumSize[i + 1]) - 後移動最大檔案數位置的檔案大小
      // 也就是說sumSize[i]的值,涉及到的檔案數目,永遠是滿足合併時允許的最大檔案數這個閾值的,它相當於一個滑動的區間,區間大小為合併時允許的最大檔案數
      sumSize[i] = fileSizes[i]
        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
    }

    // 倒序迴圈,如果檔案數目滿足最小合併時允許的最小檔案數,且該位置的檔案大小,
    // 大於合併時允許的檔案最小大小與下一個檔案視窗檔案總大小乘以一定比例中的較大者,則繼續,
    // 實際上就是選擇出一個檔案視窗內能最小能滿足的檔案大小的一組檔案
    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
          (long) (sumSize[start + 1] * ratio))) {
      ++start;
    }
    if (start < countOfFiles) {
      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
        + " files from " + countOfFiles + " candidates");
    } else if (mayBeStuck) {
      // We may be stuck. Compact the latest files if we can.
      // 保證最小檔案數目的要求
      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
      if (filesToLeave >= 0) {
        start = filesToLeave;
      }
    }
    
    // 擷取
    candidates.subList(0, start).clear();
    return candidates;
  }
        這個applyCompactionPolicy()方法是RatioBasedCompactionPolicy合併策略的精髓,我們需要細細分析,它的主要步驟為:

        1、如果檔案列表為空,原樣返回;

        2、獲取檔案合併比例:取引數hbase.hstore.compaction.ratio,預設為1.2,如果可以在峰值使用,取引數hbase.hstore.compaction.ratio.offpeak,預設為5.0,也就是說將引數調整大些;

        3、計算待合併檔案數目countOfFiles;

        4、定義用於存放檔案大小的陣列fileSizes;

        5、定義用於存放該檔案之後在最大檔案數這個範圍內所有檔案(包含該檔案)大小合計的陣列sumSize;

        6、倒序遍歷candidates檔案列表:

              6.1、將檔案大小放入陣列fileSizes指定位置;

              6.2、tooFar表示後移動最大檔案數位置的檔案大小,其實也就是從i開始剛剛滿足達到最大檔案數位置的那個檔案,也就是說,從i至tooFar數目為合併時允許的最大檔案數,它類似於一個平滑的檔案視窗;

              6.3、計算合計:該檔案大小fileSizes[i] + (截止到下一個檔案大小sumSize[i + 1]) - 後移動最大檔案數位置的檔案大小,也就是說sumSize[i]對應的被統計檔案,永遠是滿足合併時允許的最大檔案數這個閾值的,它相當於一個滑動的區間,區間大小為合併時允許的最大檔案數,sumSize[i]對應的值為已該i開始所處檔案視窗的所有檔案大小合計。

        7、正序迴圈,如果檔案數目滿足最小合併時允許的最小檔案數,且該位置的檔案大小,大於合併時允許的檔案最小大小與下一個檔案視窗檔案總大小乘以一定比例中的較大者,則繼續,實際上就是選擇出一個檔案視窗內能兼顧最小檔案數和最小檔案大小的一組檔案;

        8、保證最小檔案數目的要求,必要時進行擷取;

        9、擷取並返回擷取後的檔案列表。

        上面的一箇中心思想就是選出滿足條件的最小的一組檔案來合併。

        緊接著,我們看下檢測是否滿足最小檔案數大的checkMinFilesCriteria()方法,程式碼如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * forget the compactionSelection if we don't have enough files
   */
  private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
    int minFiles = comConf.getMinFilesToCompact();
    if (candidates.size() < minFiles) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Not compacting files because we only have " + candidates.size() +
          " files ready for compaction. Need " + minFiles + " to initiate.");
      }
      candidates.clear();
    }
    return candidates;
  }
        很直接有木有,不滿足合併時最小檔案數要求,直接clear,太奔放了!

        最後,我們看下如何移除過量的檔案,即removeExcessFiles()方法,程式碼如下:

/**
   * @param candidates pre-filtrate
   * @return filtered subset
   * take upto maxFilesToCompact from the start
   */
  private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
      boolean isUserCompaction, boolean isMajorCompaction) {
    // 是否過量:檔案列表大小減去滿足合併的最大檔案數
	int excess = candidates.size() - comConf.getMaxFilesToCompact();
    if (excess > 0) {
      if (isMajorCompaction && isUserCompaction) {
        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
            " files because of a user-requested major compaction");
      } else {
        LOG.debug("Too many admissible files. Excluding " + excess
          + " files from compaction candidates");
        candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
      }
    }
    return candidates;
  }

        它是要求待合併檔案數不能超過系統設定的合併時最大檔案數。

        至此,合併請求的生成和檔案的選擇就到此為止了。

        接下來再回到CompactSplitThread的requestCompactionInternal()方法,看下它對執行緒池是如何處理的。這裡,它首先假設大部分合並都是small,所以它將系統引發的合併放進small pool,然後在特定的時機再做決斷,如果有必要的話會挪至large pool。也就是說,如果selectNow為false,即系統自身引發的合併,比如MemStore flush、compact檢查執行緒等,統一放入到shortCompactions中,即small pool;而如果是人為觸發的,即selectNow為true,比如HBase shell觸發的,則還要看HStore中合併請求大小是否超過閾值,超過則放入longCompactions,即large pool,否則還是small pool。

        那麼這個HStore中合併請求大小是否超過閾值是如何計算的呢?我們跟蹤下HStore的throttleCompaction()方法,程式碼如下:

@Override
  public boolean throttleCompaction(long compactionSize) {
    return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
  }
        它實際上是呼叫的合併策略CompactionPolicy的throttleCompaction()方法。那麼,都有哪幾種合併策略呢?總結起來,一共有兩種:RatioBasedCompactionPolicy和StripeCompactionPolicy。現在我們以RatioBasedCompactionPolicy為例來講,另一種StripeCompactionPolicy以後再分析。看下它的throttleCompaction()方法:
/**
   * @param compactionSize Total size of some compaction
   * @return whether this should be a large or small compaction
   */
  @Override
  public boolean throttleCompaction(long compactionSize) {
    return compactionSize > comConf.getThrottlePoint();
  }
        它是將傳入的compactionSize與comConf.getThrottlePoint()來比較的,傳入的compactionSize實際上為上面提到的compaction.getRequest().getSize(),也就是合併請求的大小totalSize,這個totalSize是通過CompactionRequest的recalculateSize()方法計算得到的,程式碼如下:
/**
   * Recalculate the size of the compaction based on current files.
   * @param files files that should be included in the compaction
   */
  private void recalculateSize() {
    long sz = 0;
    for (StoreFile sf : this.filesToCompact) {
      Reader r = sf.getReader();
      sz += r == null ? 0 : r.length();
    }
    this.totalSize = sz;
  }
        它遍歷待合併檔案StoreFile,獲取其Reader,通過它獲得檔案長度並累加至totalSize。

        而comConf是其父類CompactionPolicy中關於compact配置的CompactionConfiguration型別成員變數,其getThrottlePoint()方法如下:

/**
   * @return ThrottlePoint used for classifying small and large compactions
   */
  public long getThrottlePoint() {
    return throttlePoint;
  }
        實際上取得是CompactionConfiguration的成員變數throttlePoint,而throttlePoint在其構造方法中定義如下:
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
        優先取引數hbase.regionserver.thread.compaction.throttle,如果引數未配置,預設為最大合併檔案數maxFilesToCompact與MemStore flush大小的兩倍,而這個maxFilesToCompact的取值如下:
maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10);
        也就是取引數hbase.hstore.compaction.max,引數未配置的話預設為10。那麼MemStore flush大小是如何獲取的呢?它實際上是通過StoreConfigInformation介面的getMemstoreFlushSize()方法獲取的,而需要使用的最終實現該方法的類,還是HStore,程式碼如下:
@Override
  public long getMemstoreFlushSize() {
    // TODO: Why is this in here?  The flushsize of the region rather than the store?  St.Ack
    return this.region.memstoreFlushSize;
  }
        各位看官可能有疑問了,既然compact是以Store為單位進行的,為什麼這裡獲取的是region的memstoreFlushSize呢?我們知道,HBase並不是一個純粹意義上的列式資料庫,它的MemStore flush的發起,並不是以Store為單位進行的,而是整個Region,這也是HBase一開始飽受詬病的列簇Column Family不能過多的原因。那麼,這裡的memstoreFlushSize就可以很容易理解為什麼要獲取Region的了。

        這個memstoreFlushSize我們之前介紹過,這裡再回顧下,memstoreFlushSize為HRegion上設定的一個閾值,當MemStore的大小超過這個閾值時,將會發起flush請求,它的計算首先是由Table決定的,即每個表可以設定自己的memstoreFlushSize,通過關鍵字MEMSTORE_FLUSHSIZE來設定,如果MEMSTORE_FLUSHSIZE未設定,則取引數hbase.hregion.memstore.flush.size,引數未配置的話,則預設為1024*1024*128L,即128M。

        用俺們山東人的話來說,落落了這麼多,到底是什麼意思呢?很簡單,它就是看合併請求中涉及的資料量大小是否超過一個閾值,超過則放入large pool,未超過則放入small pool。這個閾值可以通過引數直接配置,不配置的話,則是最大可合併檔案數與引起MemStore的flush的閾值memstoreFlushSize的兩倍,這個memstore flush到檔案中,是不是就是檔案的總大小呢?檔案數乘以檔案大小,是不是邏輯上近似於待合併資料的大小呢?大體就是這麼個意思。

        好了,“資料”的目的地--執行緒池選好了,接下來就是該把“資料”放入執行緒池了。既然是執行緒池,那麼這個“資料”就應該是一個執行緒,我們繼續看。

// 將合併請求包裝成CompactionRunner,扔進執行緒池去執行
    pool.execute(new CompactionRunner(s, r, compaction, pool));

        這一句體現的再明白不過了,將HStore、HRegion、合併上下文CompactionContext、執行緒池ThreadPoolExecutor包裝成一個CompactionRunner物件,扔入執行緒池中執行。而CompactionRunner給我們的第一印象就是,它必定是一個可執行的執行緒。那麼我們就看下它的程式碼吧:

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
      justification="Contrived use of compareTo")
  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
	
	// 類的定義中直接體現了,實現了Runnable介面意味著它是一個執行緒。
	
    private final Store store;
    private final HRegion region;
    private CompactionContext compaction;
    private int queuedPriority;
    private ThreadPoolExecutor parent;

    public CompactionRunner(Store store, HRegion region,
        CompactionContext compaction, ThreadPoolExecutor parent) {
      super();
      this.store = store;
      this.region = region;
      this.compaction = compaction;
      
      // 合併排隊的優先順序,如果合併上下文compaction為空,則通過HStore的getCompactPriority()方法獲取,否則直接從合併請求中獲取,
      // 而合併請求中的,實際上也是通過呼叫requestCompactionInternal()方法的priority傳入的
      this.queuedPriority = (this.compaction == null)
          ? store.getCompactPriority() : compaction.getRequest().getPriority();
      this.parent = parent;
    }
  }
        先看類的定義,類的定義中直接體現了,實現了Runnable介面意味著它是一個執行緒。而它除了建構函式傳入的那四個成員變數外,還有個表示優先順序的成員變數queuedPriority,它的初始化是在構造方法中完成的。如果合併上下文compaction為空,則通過HStore的getCompactPriority()方法獲取,否則直接從合併請求中獲取,而合併請求中的,實際上也是通過呼叫requestCompactionInternal()方法的priority傳入的。我們接下來看下HStore的getCompactPriority()方法:
@Override
  public int getCompactPriority() {
	  
	// 從StoreFileManager中獲取Compact Priority
    int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
    if (priority == PRIORITY_USER) {
      LOG.warn("Compaction priority is USER despite there being no user compaction");
    }
    return priority;
  }
        它轉而從StoreFileManager中獲取Compact Priority,繼續吧!在StoreFileManager的預設實現DefaultStoreFileManager中,程式碼如下:
  @Override
  public int getStoreCompactionPriority() {
	
	// blockingFileCount優先取引數hbase.hstore.blockingStoreFiles,未配置的話再預設為7
	// 還記得isTooManyStoreFiles這個方法嗎?MemStore在進行flush時會判斷HRegion上每個HStore下的檔案數是否太多,
    // 太多則意味著MemStore的flush會被推遲進行,優先進行compact,否則檔案數則會越來越多,而這裡,離blockingFileCount越遠,當前檔案數越小
	// 的話,則意味著MemStore的flush可以優先進行,而compact可以在它flush之後再進行,將資源利用效率最大化
    int blockingFileCount = conf.getInt(
        HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
    
    // 優先順序為上述blockingFileCount減去當前storefiles的數目
    int priority = blockingFileCount - storefiles.size();
    
    // 如果priority為1,則返回2,否則返回原值
    return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
  }
        優先順序為上述blockingFileCount減去當前storefiles的數目。而blockingFileCount優先取引數hbase.hstore.blockingStoreFiles,未配置的話再預設為7。還記得isTooManyStoreFiles這個方法嗎?MemStore在進行flush時會判斷HRegion上每個HStore下的檔案數是否太多,太多則意味著MemStore的flush會被推遲進行,優先進行compact,否則檔案數則會越來越多,而這裡,離blockingFileCount越遠,當前檔案數越小的話,則意味著MemStore的flush可以優先進行,而compact可以在它flush之後再進行,將資源利用效率最大化。

        接下來,我們在看下CompactionRunner中最重要的run()方法,程式碼如下:

@Override
    public void run() {
      Preconditions.checkNotNull(server);
      
      // 首選做一些必要的環境判斷,比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作
      if (server.isStopped()
          || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
        return;
      }
      // Common case - system compaction without a file selection. Select now.
      // 常見的,系統合併還沒有選擇待合併的檔案。現在選擇下。
      if (this.compaction == null) {
        
    	// 之前的Compact優先順序賦值給oldPriority
    	int oldPriority = this.queuedPriority;
        
        // 獲取HStore的Compact優先順序
        this.queuedPriority = this.store.getCompactPriority();
        
        // 如果當前優先順序queuedPriority大於之前的oldPriority
        if (this.queuedPriority > oldPriority) {
          // Store priority decreased while we were in queue (due to some other compaction?),
          // requeue with new priority to avoid blocking potential higher priorities.
          
          // 將該CompactionRunner在扔回執行緒池
          this.parent.execute(this);
          return;
        }
        
        // 選擇storefile
        try {
          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
        } catch (IOException ex) {
          LOG.error("Compaction selection failed " + this, ex);
          server.checkFileSystem();
          return;
        }
        if (this.compaction == null) return; // nothing to do
        // Now see if we are in correct pool for the size; if not, go to the correct one.
        // We might end up waiting for a while, so cancel the selection.
        
        // 確保合併請求存在
        assert this.compaction.hasSelection();
        
        // 再次判斷下是應該在large池中執行還是應該在small池中執行,此次只根據上述的那個閾值來判斷
        ThreadPoolExecutor pool = store.throttleCompaction(
            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
        
        if (this.parent != pool) {// 換池了
          // HStore取消合併請求
          this.store.cancelRequestedCompaction(this.compaction);
          
          // 復位compaction為null
          this.compaction = null;
          
          // 換池
          this.parent = pool;
          
          // 放入執行緒池,後續會再初始化compaction
          this.parent.execute(this);
          return;
        }
      }
      
      // Finally we can compact something.
      // 確保compaction不為空
      assert this.compaction != null;

      // 執行之前
      this.compaction.getRequest().beforeExecute();
      try {
        // Note: please don't put single-compaction logic here;
        //       put it into region/store/etc. This is CST logic.
        
    	// 執行開始時間
    	long start = EnvironmentEdgeManager.currentTime();
        
    	// 呼叫HRegion的compact,針對store執行compact
    	boolean completed = region.compact(compaction, store);
        
    	// 計算執行時間
    	long now = EnvironmentEdgeManager.currentTime();
        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
        
        // 根據合併結果確定下一步操作
        if (completed) {// 如果合併成功
          // degenerate case: blocked regions require recursive enqueues
          if (store.getCompactPriority() <= 0) {
        	// 如果優先順序Priority小於等於0,意味著當前檔案已經太多,則需要發起一次SystemCompaction
            requestSystemCompaction(region, store, "Recursive enqueue");
          } else {
            // see if the compaction has caused us to exceed max region size
        	// 請求分裂,實際上是看Region的大小是否超過閾值,從而引起分裂
            requestSplit(region);
          }
        }
      } catch (IOException ex) {
        IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
        LOG.error("Compaction failed " + this, remoteEx);
        if (remoteEx != ex) {
          LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
        }
        server.checkFileSystem();
      } catch (Exception ex) {
        LOG.error("Compaction failed " + this, ex);
        server.checkFileSystem();
      } finally {
        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
      }
      
      // 請求合併之後的處理
      this.compaction.getRequest().afterExecute();
    }
        run()方法以上來,也是會首選做一些必要的環境判斷,比如HRegionServer是否已停止、HRegion對應的表是否允許Compact操作等。

        然後,針對compaction為null的情況,進行compaction的初始化,即待合併檔案的選擇。在這個過程之前,會先判斷下優先順序,之前的Compact優先順序賦值給oldPriority,獲取HStore的Compact優先順序,如果當前優先順序queuedPriority大於之前的oldPriority的話,即HStore下檔案數目減少了,則會推遲compact,可以優先進行flush,將該CompactionRunner再扔回執行緒池。如果優先順序滿足條件,則繼續,通過selectCompaction()選擇待合併檔案,並再次判斷下是應該在large池中執行還是應該在small池中執行,此次只根據上述的那個閾值來判斷。

        接下來,如果換池了,HStore呼叫cancelRequestedCompaction()方法取消合併請求,復位compaction為null,換池,並再次放入執行緒池,後續會再初始化compaction,然後就return。

        如果沒換池的話,確保compaction不為空,呼叫HRegion的compact,針對store執行compact,計算執行時間,並獲得compact的執行結果,根據合併結果確定下一步操作。

        如果合併成功,如果優先順序Priority小於等於0,意味著當前檔案已經太多,則需要發起一次SystemCompaction,否則請求分裂,實際上是看Region的大小是否超過閾值,從而引起分裂。

        整個CompactSplitThread的工作流程已描述完畢。那麼接下來的問題,就是何時什麼情況下會發起compact請求?發起的compact請求又有如何不同呢?是否會有定期檢查的工作執行緒,促使compact在滿足一定條件的情況下進行呢?

        且聽下回分解。