1. 程式人生 > >hbase 原始碼解析(21) 自定義過濾器

hbase 原始碼解析(21) 自定義過濾器

filter 需要實現Filter 或者繼承FilterBase
  1. @InterfaceAudience.Public
  2. @InterfaceStability.Stable
  3. public abstract class Filter{
  4. //返回碼
  5. public enum ReturnCode{
  6. INCLUDE, //結果中包含著一樣
  7. INCLUDE_AND_NEXT_COL, //包含著這樣一行,跳到下一行比較
  8. SKIP, //跳到下一個keyvalue 並進行處理
  9. NEXT_COL, //跳過當前一col
  10. NEXT_ROW, //跳過當前一行
  11. SEEK_NEXT_USING_HINT, //跳到下一個滿足地方,需要呼叫getNextKeyHint()
  12. }
  13. protected transient boolean reversed;
  14. abstract public void reset() throws IOException;
  15. //判斷行健是否滿足,不滿足可以跳過,避免其他檢查:比如字首過濾器
  16. abstract public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException;
  17. //這個過濾器可以提前結束
  18. abstract public boolean filterAllRemaining() throws IOException;
  19. //對cell處理,
  20. abstract public ReturnCode filterKeyValue(final Cell v) throws IOException;
  21. abstract public Cell transformCell(final Cell v) throws IOException;
  22. @Deprecated// use Cell transformCell(final Cell)
  23. abstract public KeyValue transform(final KeyValue currentKV) throws IOException;
  24. //經過前面處理後,如果還有資料,將對當前行一起處理, 比如依賴過去器
  25. abstract public void filterRowCells(List<Cell> kvs) throws IOException;
  26. abstract public boolean hasFilterRow();
  27. //經過這麼多流程如果還有資料,會去檢查一下資料的要求。比如pagefilter 是否已經夠一頁了
  28. abstract public boolean filterRow() throws IOException;
  29. @Deprecated
  30. abstract public KeyValue getNextKeyHint(final KeyValue currentKV) throws IOException;
  31. abstract public Cell getNextCellHint(final Cell currentKV) throws IOException;
  32. abstract public boolean isFamilyEssential(byte[] name) throws IOException;
  33. abstract public byte[] toByteArray() throws IOException;
  34. public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
  35. throw new DeserializationException(
  36. "parseFrom called on base Filter, but should be called on derived type");
  37. }
  38. abstract boolean areSerializedFieldsEqual(Filter other);
  39. public void setReversed(boolean reversed) {
  40. this.reversed = reversed;
  41. }
  42. public boolean isReversed() {
  43. return this.reversed;
  44. }
  45. }
流程如下: scan,或者get是呼叫的入口 基本流程就是下面 一樣,但是沒有看到fiterKeyValue,如果你們找到了,告訴我一聲。
  1. private boolean nextInternal(List<Cell> results,ScannerContext scannerContext)
  2. throws IOException{
  3. while(true){
  4. boolean stopRow = isStopRow(currentRow, offset, length);
  5. boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
  6. if (hasFilterRow) {
  7. if (LOG.isTraceEnabled()) {
  8. LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
  9. + " formed. Changing scope of limits that may create partials");
  10. }
  11. scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
  12. scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
  13. }
  14. if (filterRowKey(currentRow, offset, length)) {
  15. incrementCountOfRowsFilteredMetric(scannerContext);
  16. // early check, see HBASE-16296
  17. //filterAllRemaining 實際呼叫
  18. if (isFilterDoneInternal()) {
  19. return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
  20. }
  21. incrementCountOfRowsScannedMetric(scannerContext);
  22. //裡面會呼叫filter.reset();
  23. boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
  24. if (!moreRows) {
  25. return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
  26. }
  27. results.clear();
  28. continue;
  29. }
  30. populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
  31. Cell nextKv = this.storeHeap.peek();
  32. stopRow = nextKv == null ||
  33. isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
  34. final boolean isEmptyRow = results.isEmpty();
    1. FilterWrapper.FilterRowRetCode ret =FilterWrapper.FilterRowRetCode.NOT_CALLED;
  35. if(hasFilterRow){
  36. //會呼叫filterRowCells(List) 和filterRowCells(cell)
  37. ret = filter.filterRowCellsWithRet(results);
  38. long timeProgress = scannerContext.getTimeProgress();
  39. if (scannerContext.getKeepProgress()) {
  40. scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
  41. initialTimeProgress);
  42. } else {
  43. scannerContext.clearProgress();
  44. }
  45. scannerContext.setTimeProgress(timeProgress);
  46. scannerContext.incrementBatchProgress(results.size());
  47. for (Cell cell : results) {
  48. scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
  49. }
  50. }
  51. if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
  52. incrementCountOfRowsFilteredMetric(scannerContext);
  53. results.clear();
  54. boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
  55. if (!moreRows) {
  56. return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
  57. }
  58. if (!stopRow) continue;
  59. return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
  60. }
  61. if (this.joinedHeap != null) {
  62. boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
  63. if (mayHaveData) {
  64. joinedContinuationRow = current;
  65. populateFromJoinedHeap(results, scannerContext);
  66. if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
  67. return true;
  68. }
  69. }
  70. }
  71. } else {
  72. populateFromJoinedHeap(results, scannerContext);
  73. if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
  74. return true;
  75. }
  76. }
  77. if (stopRow) {
  78. return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
  79. } else {
  80. return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
  81. }
  82. }
  83. }
就這樣。 自定義完成後,打成jar ,需要export export HBASE_CLAPSS  或者將jar 放到hbase 的安裝目錄的lib下面,重啟hbase