1. 程式人生 > >HBase 系列(七)——HBase 過濾器詳解

HBase 系列(七)——HBase 過濾器詳解

一、HBase過濾器簡介

Hbase 提供了種類豐富的過濾器(filter)來提高資料處理的效率,使用者可以通過內建或自定義的過濾器來對資料進行過濾,所有的過濾器都在服務端生效,即謂詞下推(predicate push down)。這樣可以保證過濾掉的資料不會被傳送到客戶端,從而減輕網路傳輸和客戶端處理的壓力。

二、過濾器基礎

2.1 Filter介面和FilterBase抽象類

Filter 介面中定義了過濾器的基本方法,FilterBase 抽象類實現了 Filter 介面。所有內建的過濾器則直接或者間接繼承自 FilterBase 抽象類。使用者只需要將定義好的過濾器通過 setFilter

方法傳遞給 Scanput 的例項即可。

setFilter(Filter filter)
 // Scan 中定義的 setFilter
 @Override
  public Scan setFilter(Filter filter) {
    super.setFilter(filter);
    return this;
  }
  // Get 中定義的 setFilter
 @Override
  public Get setFilter(Filter filter) {
    super.setFilter(filter);
    return this;
  }

FilterBase 的所有子類過濾器如下:

說明:上圖基於當前時間點(2019.4)最新的 Hbase-2.1.4 ,下文所有說明均基於此版本。

2.2 過濾器分類

HBase 內建過濾器可以分為三類:分別是比較過濾器,專用過濾器和包裝過濾器。分別在下面的三個小節中做詳細的介紹。

三、比較過濾器

所有比較過濾器均繼承自 CompareFilter。建立一個比較過濾器需要兩個引數,分別是比較運算子和比較器例項。

 public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {
    this.compareOp = compareOp;
    this.comparator = comparator;
  }

3.1 比較運算子

  • LESS (<)
  • LESS_OR_EQUAL (<=)
  • EQUAL (=)
  • NOT_EQUAL (!=)
  • GREATER_OR_EQUAL (>=)
  • GREATER (>)
  • NO_OP (排除所有符合條件的值)

比較運算子均定義在列舉類 CompareOperator

@InterfaceAudience.Public
public enum CompareOperator {
  LESS,
  LESS_OR_EQUAL,
  EQUAL,
  NOT_EQUAL,
  GREATER_OR_EQUAL,
  GREATER,
  NO_OP,
}

注意:在 1.x 版本的 HBase 中,比較運算子定義在 CompareFilter.CompareOp 列舉類中,但在 2.0 之後這個類就被標識為 @deprecated ,並會在 3.0 移除。所以 2.0 之後版本的 HBase 需要使用 CompareOperator 這個列舉類。

3.2 比較器

所有比較器均繼承自 ByteArrayComparable 抽象類,常用的有以下幾種:

  • BinaryComparator : 使用 Bytes.compareTo(byte [],byte []) 按字典序比較指定的位元組陣列。
  • BinaryPrefixComparator : 按字典序與指定的位元組陣列進行比較,但只比較到這個位元組陣列的長度。
  • RegexStringComparator : 使用給定的正則表示式與指定的位元組陣列進行比較。僅支援 EQUALNOT_EQUAL 操作。
  • SubStringComparator : 測試給定的子字串是否出現在指定的位元組陣列中,比較不區分大小寫。僅支援 EQUALNOT_EQUAL 操作。
  • NullComparator :判斷給定的值是否為空。
  • BitComparator :按位進行比較。

BinaryPrefixComparatorBinaryComparator 的區別不是很好理解,這裡舉例說明一下:

在進行 EQUAL 的比較時,如果比較器傳入的是 abcd 的位元組陣列,但是待比較資料是 abcdefgh

  • 如果使用的是 BinaryPrefixComparator 比較器,則比較以 abcd 位元組陣列的長度為準,即 efgh 不會參與比較,這時候認為 abcdabcdefgh 是滿足 EQUAL 條件的;
  • 如果使用的是 BinaryComparator 比較器,則認為其是不相等的。

3.3 比較過濾器種類

比較過濾器共有五個(Hbase 1.x 版本和 2.x 版本相同),見下圖:

  • RowFilter :基於行鍵來過濾資料;
  • FamilyFilterr :基於列族來過濾資料;
  • QualifierFilterr :基於列限定符(列名)來過濾資料;
  • ValueFilterr :基於單元格 (cell) 的值來過濾資料;
  • DependentColumnFilter :指定一個參考列來過濾其他列的過濾器,過濾的原則是基於參考列的時間戳來進行篩選 。

前四種過濾器的使用方法相同,均只要傳遞比較運算子和運算器例項即可構建,然後通過 setFilter 方法傳遞給 scan

 Filter filter  = new RowFilter(CompareOperator.LESS_OR_EQUAL,
                                new BinaryComparator(Bytes.toBytes("xxx")));
  scan.setFilter(filter);    

DependentColumnFilter 的使用稍微複雜一點,這裡單獨做下說明。

3.4 DependentColumnFilter

可以把 DependentColumnFilter 理解為一個 valueFilter 和一個時間戳過濾器的組合。DependentColumnFilter 有三個帶參構造器,這裡選擇一個引數最全的進行說明:

DependentColumnFilter(final byte [] family, final byte[] qualifier,
                               final boolean dropDependentColumn, final CompareOperator op,
                               final ByteArrayComparable valueComparator)
  • family :列族
  • qualifier :列限定符(列名)
  • dropDependentColumn :決定參考列是否被包含在返回結果內,為 true 時表示參考列被返回,為 false 時表示被丟棄
  • op :比較運算子
  • valueComparator :比較器

這裡舉例進行說明:

DependentColumnFilter dependentColumnFilter = new DependentColumnFilter( 
    Bytes.toBytes("student"),
    Bytes.toBytes("name"),
    false,
    CompareOperator.EQUAL, 
    new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
  • 首先會去查詢 student:name 中值以 xiaolan 開頭的所有資料獲得 參考資料集,這一步等同於 valueFilter 過濾器;

  • 其次再用參考資料集中所有資料的時間戳去檢索其他列,獲得時間戳相同的其他列的資料作為 結果資料集,這一步等同於時間戳過濾器;

  • 最後如果 dropDependentColumn 為 true,則返回 參考資料集+結果資料集,若為 false,則拋棄參考資料集,只返回 結果資料集

四、專用過濾器

專用過濾器通常直接繼承自 FilterBase,適用於範圍更小的篩選規則。

4.1 單列列值過濾器 (SingleColumnValueFilter)

基於某列(參考列)的值決定某行資料是否被過濾。其例項有以下方法:

  • setFilterIfMissing(boolean filterIfMissing) :預設值為 false,即如果該行資料不包含參考列,其依然被包含在最後的結果中;設定為 true 時,則不包含;
  • setLatestVersionOnly(boolean latestVersionOnly) :預設為 true,即只檢索參考列的最新版本資料;設定為 false,則檢索所有版本資料。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                "student".getBytes(), 
                "name".getBytes(), 
                CompareOperator.EQUAL, 
                new SubstringComparator("xiaolan"));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);

4.2 單列列值排除器 (SingleColumnValueExcludeFilter)

SingleColumnValueExcludeFilter 繼承自上面的 SingleColumnValueFilter,過濾行為與其相反。

4.3 行鍵字首過濾器 (PrefixFilter)

基於 RowKey 值決定某行資料是否被過濾。

PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(prefixFilter);

4.4 列名字首過濾器 (ColumnPrefixFilter)

基於列限定符(列名)決定某行資料是否被過濾。

ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));
 scan.setFilter(columnPrefixFilter);

4.5 分頁過濾器 (PageFilter)

可以使用這個過濾器實現對結果按行進行分頁,建立 PageFilter 例項的時候需要傳入每頁的行數。

public PageFilter(final long pageSize) {
    Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
    this.pageSize = pageSize;
  }

下面的程式碼體現了客戶端實現分頁查詢的主要邏輯,這裡對其進行一下解釋說明:

客戶端進行分頁查詢,需要傳遞 startRow(起始 RowKey),知道起始 startRow 後,就可以返回對應的 pageSize 行資料。這裡唯一的問題就是,對於第一次查詢,顯然 startRow 就是表格的第一行資料,但是之後第二次、第三次查詢我們並不知道 startRow,只能知道上一次查詢的最後一條資料的 RowKey(簡單稱之為 lastRow)。

我們不能將 lastRow 作為新一次查詢的 startRow 傳入,因為 scan 的查詢區間是[startRow,endRow) ,即前開後閉區間,這樣 startRow 在新的查詢也會被返回,這條資料就重複了。

同時在不使用第三方資料庫儲存 RowKey 的情況下,我們是無法通過知道 lastRow 的下一個 RowKey 的,因為 RowKey 的設計可能是連續的也有可能是不連續的。

由於 Hbase 的 RowKey 是按照字典序進行排序的。這種情況下,就可以在 lastRow 後面加上 0 ,作為 startRow 傳入,因為按照字典序的規則,某個值加上 0 後的新值,在字典序上一定是這個值的下一個值,對於 HBase 來說下一個 RowKey 在字典序上一定也是等於或者大於這個新值的。

所以最後傳入 lastRow+0,如果等於這個值的 RowKey 存在就從這個值開始 scan,否則從字典序的下一個 RowKey 開始 scan。

25 個字母以及數字字元,字典排序如下:

'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'

分頁查詢主要實現邏輯:

byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(15);

int totalRows = 0;
byte[] lastRow = null;
while (true) {
    Scan scan = new Scan();
    scan.setFilter(filter);
    if (lastRow != null) {
        // 如果不是首行 則 lastRow + 0
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row: " +
                           Bytes.toStringBinary(startRow));
        scan.withStartRow(startRow);
    }
    ResultScanner scanner = table.getScanner(scan);
    int localRows = 0;
    Result result;
    while ((result = scanner.next()) != null) {
        System.out.println(localRows++ + ": " + result);
        totalRows++;
        lastRow = result.getRow();
    }
    scanner.close();
    //最後一頁,查詢結束  
    if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);

需要注意的是在多臺 Regin Services 上執行分頁過濾的時候,由於並行執行的過濾器不能共享它們的狀態和邊界,所以有可能每個過濾器都會在完成掃描前獲取了 PageCount 行的結果,這種情況下會返回比分頁條數更多的資料,分頁過濾器就有失效的可能。

4.6 時間戳過濾器 (TimestampsFilter)

List<Long> list = new ArrayList<>();
list.add(1554975573000L);
TimestampsFilter timestampsFilter = new TimestampsFilter(list);
scan.setFilter(timestampsFilter);

4.7 首次行鍵過濾器 (FirstKeyOnlyFilter)

FirstKeyOnlyFilter 只掃描每行的第一列,掃描完第一列後就結束對當前行的掃描,並跳轉到下一行。相比於全表掃描,其效能更好,通常用於行數統計的場景,因為如果某一行存在,則行中必然至少有一列。

FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();
scan.set(firstKeyOnlyFilter);

五、包裝過濾器

包裝過濾器就是通過包裝其他過濾器以實現某些拓展的功能。

5.1 SkipFilter過濾器

SkipFilter 包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 例項時,則拓展過濾整行資料。下面是一個使用示例:

// 定義 ValueFilter 過濾器
Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,
      new BinaryComparator(Bytes.toBytes("xxx")));
// 使用 SkipFilter 進行包裝
Filter filter2 = new SkipFilter(filter1);

5.2 WhileMatchFilter過濾器

WhileMatchFilter 包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 例項時,WhileMatchFilter 則結束本次掃描,返回已經掃描到的結果。下面是其使用示例:

Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,
                               new BinaryComparator(Bytes.toBytes("rowKey4")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
    for (Cell cell : result.listCells()) {
        System.out.println(cell);
    }
}
scanner1.close();

System.out.println("--------------------");

// 使用 WhileMatchFilter 進行包裝
Filter filter2 = new WhileMatchFilter(filter1);

scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner1) {
    for (Cell cell : result.listCells()) {
        System.out.println(cell);
    }
}
scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0
rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0
rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0
rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0
rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0
--------------------
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0

可以看到被包裝後,只返回了 rowKey4 之前的資料。

六、FilterList

以上都是講解單個過濾器的作用,當需要多個過濾器共同作用於一次查詢的時候,就需要使用 FilterListFilterList 支援通過構造器或者 addFilter 方法傳入多個過濾器。

// 構造器傳入
public FilterList(final Operator operator, final List<Filter> filters)
public FilterList(final List<Filter> filters)
public FilterList(final Filter... filters)

// 方法傳入
 public void addFilter(List<Filter> filters)
 public void addFilter(Filter filter)

多個過濾器組合的結果由 operator 引數定義 ,其可選引數定義在 Operator 列舉類中。只有 MUST_PASS_ALLMUST_PASS_ONE 兩個可選的值:

  • MUST_PASS_ALL :相當於 AND,必須所有的過濾器都通過才認為通過;
  • MUST_PASS_ONE :相當於 OR,只有要一個過濾器通過則認為通過。
@InterfaceAudience.Public
  public enum Operator {
    /** !AND */
    MUST_PASS_ALL,
    /** !OR */
    MUST_PASS_ONE
  }

使用示例如下:

List<Filter> filters = new ArrayList<Filter>();

Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
                               new BinaryComparator(Bytes.toBytes("XXX")));
filters.add(filter1);

Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
                               new BinaryComparator(Bytes.toBytes("YYY")));
filters.add(filter2);

Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
                                     new RegexStringComparator("ZZZ"));
filters.add(filter3);

FilterList filterList = new FilterList(filters);

Scan scan = new Scan();
scan.setFilter(filterList);

參考資料

HBase: The Definitive Guide _> Chapter 4. Client API: Advanced Features

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南