1. 程式人生 > >HBase原始碼分析之regionserver讀取流程分析

HBase原始碼分析之regionserver讀取流程分析

資料的讀取包括Get和Scan2種,通過get的程式碼可以看出實際也是通過轉換為一個Scan來處理的。

//HRegion.java
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {

    List<Cell> results = new ArrayList<Cell>();
    ...
    Scan scan = new Scan(get);

    RegionScanner scanner = null;
    try {
      scanner = getScanner(scan);
      scanner.next(results);
    } finally {
      if
(scanner != null) scanner.close(); } ... return results; }

接下來,來看getScaner方法的處理。getScanner方法開始做了family檢查後,接下來就是呼叫instantiateRegionScanner

//HRegion.java
protected RegionScanner instantiateRegionScanner(Scan scan,
      List<KeyValueScanner> additionalScanners) throws IOException {
    if
(scan.isReversed()) { if (scan.getFilter() != null) { scan.getFilter().setReversed(true); } return new ReversedRegionScannerImpl(scan, additionalScanners, this); } return new RegionScannerImpl(scan, additionalScanners, this); }

這裡根據scan的型別的不同返回不同的RegionScanner物件。
順序scan為RegionScannerImpl,反向scan為ReversedRegionScannerImpl
先來看RegionScannerImpl的實現

RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
        throws IOException {

      this.region = region;
      this.maxResultSize = scan.getMaxResultSize();
      if (scan.hasFilter()) {
        this.filter = new FilterWrapper(scan.getFilter());
      } else {
        this.filter = null;
      }

      /**
       * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
       * scanner context that can be used to enforce the batch limit in the event that a
       * ScannerContext is not specified during an invocation of next/nextRaw
       */
      defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();

      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
        this.stopRow = null;
      } else {
        this.stopRow = scan.getStopRow();
      }
      // If we are doing a get, we want to be [startRow,endRow] normally
      // it is [startRow,endRow) and if startRow=endRow we get nothing.
      this.isScan = scan.isGetScan() ? -1 : 0;

      // synchronize on scannerReadPoints so that nobody calculates
      // getSmallestReadPoint, before scannerReadPoints is updated.
      IsolationLevel isolationLevel = scan.getIsolationLevel();
      synchronized(scannerReadPoints) {
        this.readPt = getReadpoint(isolationLevel);
        scannerReadPoints.put(this, this.readPt);
      }

      // Here we separate all scanners into two lists - scanner that provide data required
      // by the filter to operate (scanners list) and all others (joinedScanners list).
      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
      List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
      if (additionalScanners != null) {
        scanners.addAll(additionalScanners);
      }

      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
          scan.getFamilyMap().entrySet()) {
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);
        } else {
          joinedScanners.add(scanner);
        }
      }
      initializeKVHeap(scanners, joinedScanners, region);
    }

這裡additionalScanners為null,
scan.doLoadColumnFamiliesOnDemand() scan預設是false。
filter.isFamilyEssential(entry.getKey()) filter都為true
所以這裡每個family的scanner都是在scanners裡面
再來看HStore的getScanner方法

public KeyValueScanner getScanner(Scan scan,
      final NavigableSet<byte []> targetCols, long readPt) throws IOException {
    lock.readLock().lock();
    try {
      KeyValueScanner scanner = null;
      if (this.getCoprocessorHost() != null) {
        scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
      }
      if (scanner == null) {
        scanner = scan.isReversed() ? new ReversedStoreScanner(this,
            getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
            getScanInfo(), scan, targetCols, readPt);
      }
      return scanner;
    } finally {
      lock.readLock().unlock();
    }
  }

這裡與上面類似,也是根據scan的型別返回不同的KeyValueScanner
順序scan為StoreScanner,反向scan為ReversedStoreScanner。
先看StoreScanner,

private StoreScanner(final Scan scan, ScanInfo scanInfo,
      ScanType scanType, final NavigableSet<byte[]> columns,
      final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
          throws IOException {
    this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
        scanInfo.getMinVersions(), readPt);
    this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);

    // In unit tests, the store could be null
    if (this.store != null) {
      this.store.addChangedReaderObserver(this);
    }
    // Seek all scanners to the initial key
    seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
    resetKVHeap(scanners, scanInfo.getComparator());
  }

  /**
   * Get a filtered list of scanners. Assumes we are not in a compaction.
   * @return list of scanners to seek
   */
  protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
    final boolean isCompaction = false;
    boolean usePread = isGet || scanUsePread;
    return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
        isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
  }

HStore.getScanners方法返回所有storefile的scanner(StoreFileScanner)以及memstore的scanner(MemStoreScanner)
StoreScanner.selectScannersFrom通過bloom filter, time range以及ttl對這些scanner進行過濾。
最後將這些的KeyValueScanner封裝到一個KeyValueHeap
從下圖可以看出StoreFileScanner,MemStoreScanner,StoreScanner都是實現的KeyValueScanner介面
這裡寫圖片描述
下面來梳理下這些scanner之間的關係。
1、發起的scan請求到每個Region會返回一個RegionScanner
2、針對Region的每個family會分別用一個StoreScanner,這些StoreScanner在RegionScanner中都是放在KeyValueHeap中的
3、在每個StoreScanner中,會有1個或多個的StoreFileScanner(每個storefile一個)和一個MemStoreScanner,這些scanner也是放在KeyVaueHeap中的。
而針對一個rowkey的資料會有多個family,每個family下有多個column,每個column還會有多個版本的資料。那是如何把一個rowkey的資料查詢到後再如何合併到一起的呢?下面來看看RegionScanner.nextRaw方法,該方法是來獲取資料的。
先看其定義
boolean nextRaw(List result) throws IOException;
獲取的資料會放在傳入的result中。
nextRaw會呼叫nextInterval方法

// Region.java
// class RegionScannerImpl
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
        throws IOException {
        // 這裡我們先只關注如何從storeHeap中取數的過程
        // storeHeap即為前面提到的存放了StoreScanner的KeyValueHeap
        ...

        // Let's see what we have in the storeHeap.
        Cell current = this.storeHeap.peek();

        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getRowArray();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }

        ...
          // Check if rowkey filter wants to exclude this row. If so, loop to next.
          // Technically, if we hit limits before on this row, we don't need this call.
          if (filterRowKey(currentRow, offset, length)) {
            boolean moreRows = nextRow(currentRow, offset, length);
            if (!moreRows) {
              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
            }
            results.clear();
            continue;
          }

          // Ok, we are good, let's try to get some results from the main heap.
          populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);

          ...
    }
private boolean populateResult(List<Cell> results, KeyValueHeap heap,
        ScannerContext scannerContext, byte[] currentRow, int offset, short length)
        throws IOException {
      Cell nextKv;
      boolean moreCellsInRow = false;
      boolean tmpKeepProgress = scannerContext.getKeepProgress();
      // Scanning between column families and thus the scope is between cells
      LimitScope limitScope = LimitScope.BETWEEN_CELLS;
      do {
        // We want to maintain any progress that is made towards the limits while scanning across
        // different column families. To do this, we toggle the keep progress flag on during calls
        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
        scannerContext.setKeepProgress(true);
        heap.next(results, scannerContext);
        scannerContext.setKeepProgress(tmpKeepProgress);

        nextKv = heap.peek();
        moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);

        if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
        } else if (scannerContext.checkSizeLimit(limitScope)) {
          ScannerContext.NextState state =
              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
          return scannerContext.setScannerState(state).hasMoreValues();
        } else if (scannerContext.checkTimeLimit(limitScope)) {
          ScannerContext.NextState state =
              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
          return scannerContext.setScannerState(state).hasMoreValues();
        }
      } while (moreCellsInRow);

      return nextKv != null;
    }

這裡是通過KeyValueHeap的peek()和next(List result) 方法來獲取資料的。接著來看看KeyValueHeap的實現

public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
    implements KeyValueScanner, InternalScanner {
    // 使用優先順序佇列來存放所有的KeyValueScanner
  protected PriorityQueue<KeyValueScanner> heap = null;

  /**
   * The current sub-scanner, i.e. the one that contains the next key/value
   * to return to the client. This scanner is NOT included in {@link #heap}
   * (but we frequently add it back to the heap and pull the new winner out).
   * We maintain an invariant that the current sub-scanner has already done
   * a real seek, and that current.peek() is always a real key/value (or null)
   * except for the fake last-key-on-row-column supplied by the multi-column
   * Bloom filter optimization, which is OK to propagate to StoreScanner. In
   * order to ensure that, always use {@link #pollRealKV()} to update current.
   */
   //當前的KeyValueScanner,是從heap中取出來最頂端的scanner。
   // 取出來後就不在heap中了,後續會再放回heap中
  protected KeyValueScanner current = null;

  protected KVScannerComparator comparator;
  }
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
    if (this.current == null) {
      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
    }
    InternalScanner currentAsInternal = (InternalScanner)this.current;
    // 往result中寫資料
    boolean moreCells = currentAsInternal.next(result, scannerContext);
    Cell pee = this.current.peek();

    /*
     * By definition, any InternalScanner must return false only when it has no
     * further rows to be fetched. So, we can close a scanner if it returns
     * false. All existing implementations seem to be fine with this. It is much
     * more efficient to close scanners which are not needed than keep them in
     * the heap. This is also required for certain optimizations.
     */

   // current的KeyValueScanner中沒有資料了,就關閉這個KeyValueScanner
    if (pee == null || !moreCells) {
      this.current.close();
    } else {
      this.heap.add(this.current);
    }
    this.current = null;
    this.current = pollRealKV();
    if (this.current == null) {
      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
    }
    return moreCells;
  }
// 從heap中取出最頂端的scanner
protected KeyValueScanner pollRealKV() throws IOException {
    // 取得最頂端的kvScanner
    KeyValueScanner kvScanner = heap.poll();
    if (kvScanner == null) {
      return null;
    }

    while (kvScanner != null && !kvScanner.realSeekDone()) {
      if (kvScanner.peek() != null) {
        try {
          // 移動到指定的位置
          kvScanner.enforceSeek();
        } catch (IOException ioe) {
          kvScanner.close();
          throw ioe;
        }
        Cell curKV = kvScanner.peek();
        if (curKV != null) {
          // 獲取heap目前的最頂端的scanner但是不拿出來。
          KeyValueScanner nextEarliestScanner = heap.peek();
          if (nextEarliestScanner == null) {
            // The heap is empty. Return the only possible scanner.
            return kvScanner;
          }

          // Compare the current scanner to the next scanner. We try to avoid
          // putting the current one back into the heap if possible.
          // 由於拿出來的scanner有移動了位置,需重新比較
          Cell nextKV = nextEarliestScanner.peek();
          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
            // We already have the scanner with the earliest KV, so return it.
            // 拿出來的scanner的第一個Cell仍然是最小的
            return kvScanner;
          }

          // Otherwise, put the scanner back into the heap and let it compete
          // against all other scanners (both those that have done a "real
          // seek" and a "lazy seek").
          // 拿出來的scanner的第一個Cell不是最小的了,就需要將這個scanner放回到heap中
          heap.add(kvScanner);
        } else {
          // Close the scanner because we did a real seek and found out there
          // are no more KVs.
          kvScanner.close();
        }
      } else {
        // Close the scanner because it has already run out of KVs even before
        // we had to do a real seek on it.
        kvScanner.close();
      }
      // 重新從heap中取出頂端的scanner,然後再重新開始剛才的過程
      kvScanner = heap.poll();
    }

    return kvScanner;
  }

再來看看heap中是如何比較來獲取頂端的scanner的。
this.heap = new PriorityQueue(scanners.size(),
this.comparator);
在例項化PriorityQueue時傳入了一個KVScannerComparator,KVScannerComparator裡面包含了一個KVComparator,後者這個實際傳入的是region的compactor

// HRegion.java
this.comparator = fs.getRegionInfo().getComparator();
...
public KVComparator getComparator() {
    return isMetaRegion()?
      KeyValue.META_COMPARATOR: KeyValue.COMPARATOR;
  }

// KeyValue.java
public static final KVComparator COMPARATOR = new KVComparator();
// KeyValueHeap.java
// class KVScannerComparator
public int compare(KeyValueScanner left, KeyValueScanner right) {
      int comparison = compare(left.peek(), right.peek());
      if (comparison != 0) {
        return comparison;
      } else {
        // Since both the keys are exactly the same, we break the tie in favor
        // of the key which came latest.
        long leftSequenceID = left.getSequenceID();
        long rightSequenceID = right.getSequenceID();
        if (leftSequenceID > rightSequenceID) {
          return -1;
        } else if (leftSequenceID < rightSequenceID) {
          return 1;
        } else {
          return 0;
        }
      }
    }

KVScannerComparator的compare方法中先比較KeyValueScanner中的第一個的Cell。如果一樣再比較每個KeyValueScanner的sequenceID,PriorityQueue的頭部是最小的資料。而sequenceId最大的是最新的資料,所以這裡比較時做了個反向處理讓sequenceId大的在頭部
在Cell的比較中是通過CellCompartor.compare來比較的

// CellCompartor.java
public static int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
    // row
    int c = compareRows(a, b);
    if (c != 0) return c;

    c = compareWithoutRow(a, b);
    if(c != 0) return c;

    if (!ignoreSequenceid) {
      // Negate following comparisons so later edits show up first
      // mvccVersion: later sorts first
      return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
    } else {
      return c;
    }
  }
  ;
    }
    if (rightCell.getFamilyLength() + rightCell.getQualifierLength() == 0
        && rightCell.getTypeByte() == Type.Minimum.getCode()) {
      return -1;
    }
    boolean sameFamilySize = (leftCell.getFamilyLength() == rightCell.getFamilyLength());
    if (!sameFamilySize) {
      // comparing column family is enough.

      return Bytes.compareTo(leftCell.getFamilyArray(), leftCell.getFamilyOffset(),
          leftCell.getFamilyLength(), rightCell.getFamilyArray(), rightCell.getFamilyOffset(),
          rightCell.getFamilyLength());
    }
    int diff = compareColumns(leftCell, rightCell);
    if (diff != 0) return diff;

    diff = compareTimestamps(leftCell, rightCell);
    if (diff != 0) return diff;

    // Compare types. Let the delete types sort ahead of puts; i.e. types
    // of higher numbers sort before those of lesser numbers. Maximum (255)
    // appears ahead of everything, and minimum (0) appears after
    // everything.
    return (0xff & rightCell.getTypeByte()) - (0xff & leftCell.getTypeByte());
  }

  public static int compareTimestamps(final Cell left, final Cell right) {
    long ltimestamp = left.getTimestamp();
    long rtimestamp = right.getTimestamp();
    return compareTimestamps(ltimestamp, rtimestamp);
  }

private static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
    // The below older timestamps sorting ahead of newer timestamps looks
    // wrong but it is intentional. This way, newer timestamps are first
    // found when we iterate over a memstore and newer versions are the
    // first we trip over when reading from a store file.
    if (ltimestamp < rtimestamp) {
      return 1;
    } else if (ltimestamp > rtimestamp) {
      return -1;
    }
    return 0;
  }

在這裡依次比較rowkey,family,column,timestamp。前面3個比較都是按Bytes.compareTo來比較,字典順序,而最後比較時間戳時,同樣是由於時間戳大的資料是新的資料,所以這裡同樣用了一個反向的處理,保證時間戳大的資料在前面。
而在ReversedStoreScanner中呢,使用的是ReversedKeyValueHeap,KVCompactor為ReversedKVScannerComparator,

//ReversedKVScannerComparator.java
public int compare(KeyValueScanner left, KeyValueScanner right) {
      int rowComparison = compareRows(left.peek(), right.peek());
      if (rowComparison != 0) {
        return -rowComparison;
      }
      return super.compare(left, right);
    }

將原來的比較的結果取反,
ReversedKeyValueHeap.next()方法是將記錄往小的方向移動。
至此,服務端的資料讀取流程就分析完了。