1. 程式人生 > >HBase內建過濾器的一些總結

HBase內建過濾器的一些總結

轉載自:https://blog.csdn.net/cnweike/article/details/42920547

HBase為篩選資料提供了一組過濾器,通過這個過濾器可以在HBase中的資料的多個維度(行,列,資料版本)上進行對資料的篩選操作,也就是說過濾器最終能夠篩選的資料能夠細化到具體的一個儲存單元格上(由行鍵,列明,時間戳定位)。通常來說,通過行鍵,值來篩選資料的應用場景較多。

1. RowFilter:篩選出匹配的所有的行,對於這個過濾器的應用場景,是非常直觀的:使用BinaryComparator可以篩選出具有某個行鍵的行,或者通過改變比較運算子(下面的例子中是CompareFilter.CompareOp.EQUAL)來篩選出符合某一條件的多條資料,以下就是篩選出行鍵為row1的一行資料:

Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1"))); // OK 篩選出匹配的所有的行

2. PrefixFilter:篩選出具有特定字首的行鍵的資料。這個過濾器所實現的功能其實也可以由RowFilter結合RegexComparator來實現,不過這裡提供了一種簡便的使用方法,以下過濾器就是篩選出行鍵以row為字首的所有的行:

Filter pf = new PrefixFilter(Bytes.toBytes("row")); // OK  篩選匹配行鍵的字首成功的行

3. KeyOnlyFilter:這個過濾器唯一的功能就是隻返回每行的行鍵,值全部為空,這對於只關注於行鍵的應用場景來說非常合適,這樣忽略掉其值就可以減少傳遞到客戶端的資料量,能起到一定的優化作用:

Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空

4. RandomRowFilter:從名字上就可以看出其大概的用法,本過濾器的作用就是按照一定的機率(<=0會過濾掉所有的行,>=1會包含所有的行)來返回隨機的結果集,對於同樣的資料集,多次使用同一個RandomRowFilter會返回不通的結果集,對於需要隨機抽取一部分資料的應用場景,可以使用此過濾器:

Filter rrf = new RandomRowFilter((float) 0.8); // OK 隨機選出一部分的行

5. InclusiveStopFilter:掃描的時候,我們可以設定一個開始行鍵和一個終止行鍵,預設情況下,這個行鍵的返回是前閉後開區間,即包含起始行,但不包含終止行,如果我們想要同時包含起始行和終止行,那麼我們可以使用此過濾器:

Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了掃描的上限在結果之內

6. FirstKeyOnlyFilter:如果你只想返回的結果集中只包含第一列的資料,那麼這個過濾器能夠滿足你的要求。它在找到每行的第一列之後會停止掃描,從而使掃描的效能也得到了一定的提升:

Filter fkof = new FirstKeyOnlyFilter(); // OK 篩選出第一個每個第一個單元格

7. ColumnPrefixFilter:顧名思義,它是按照列名的字首來篩選單元格的,如果我們想要對返回的列的字首加以限制的話,可以使用這個過濾器:

Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1")); // OK 篩選出字首匹配的列

8. ValueFilter:按照具體的值來篩選單元格的過濾器,這會把一行中值不能滿足的單元格過濾掉,如下面的構造器,對於每一行的一個列,如果其對應的值不包含ROW2_QUAL1,那麼這個列就不會返回給客戶端:

Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1")); // OK 篩選某個(值的條件滿足的)特定的單元格

9. ColumnCountGetFilter:這個過濾器來返回每行最多返回多少列,並在遇到一行的列數超過我們所設定的限制值的時候,結束掃描操作:

Filter ccf = new ColumnCountGetFilter(2); // OK 如果突然發現一行中的列數超過設定的最大值時,整個掃描操作會停止

10. SingleColumnValueFilter:用一列的值決定這一行的資料是否被過濾。在它的具體物件上,可以呼叫setFilterIfMissing(true)或者setFilterIfMissing(false),預設的值是false,其作用是,對於咱們要使用作為條件的列,如果這一列本身就不存在,那麼如果為true,這樣的行將會被過濾掉,如果為false,這樣的行會包含在結果集中。

        SingleColumnValueFilter scvf = new SingleColumnValueFilter(
                Bytes.toBytes("colfam1"), 
                Bytes.toBytes("qual2"), 
                CompareFilter.CompareOp.NOT_EQUAL, 
                new SubstringComparator("BOGUS"));
        scvf.setFilterIfMissing(false);
        scvf.setLatestVersionOnly(true); // OK

11. SingleColumnValueExcludeFilter:這個與10種的過濾器唯一的區別就是,作為篩選條件的列的不會包含在返回的結果中。
12. SkipFilter:這是一種附加過濾器,其與ValueFilter結合使用,如果發現一行中的某一列不符合條件,那麼整行就會被過濾掉:

Filter skf = new SkipFilter(vf); // OK 發現某一行中的一列需要過濾時,整個行就會被過濾掉

13. WhileMatchFilter:這個過濾器的應用場景也很簡單,如果你想要在遇到某種條件資料之前的資料時,就可以使用這個過濾器;當遇到不符合設定條件的資料的時候,整個掃描也就結束了:

Filter wmf = new WhileMatchFilter(rf); // OK 類似於Python itertools中的takewhile

14. FilterList:用於綜合使用多個過濾器。其有兩種關係:FilterList.Operator.MUST_PASS_ONE和FilterList.Operator.MUST_PASS_ALL,預設的是FilterList.Operator.MUST_PASS_ALL,顧名思義,它們分別是AND和OR的關係,並且FilterList可以巢狀使用FilterList,使我們能夠表達更多的需求:

 

       List<Filter> filters = new ArrayList<Filter>();
        filters.add(rf);
        filters.add(vf);
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters); // OK 綜合使用多個過濾器, AND 和 OR 兩種關係

以上,是對於HBase內建的過濾器的部分總結,以下程式碼是資料寫入程式碼:

package com.reyun.hbase;
 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
 
public class HBaseDataFeeding {
    private final static byte[] ROW1 = Bytes.toBytes("row1");
    private final static byte[] ROW2 = Bytes.toBytes("row2");
    private final static byte[] COLFAM1 = Bytes.toBytes("colfam1");
    private final static byte[] COLFAM2 = Bytes.toBytes("colfam2");
    private final static byte[] QUAL1 = Bytes.toBytes("qual1");
    private final static byte[] QUAL2 = Bytes.toBytes("qual2");
    
    
    public static void main(String[] args) throws IOException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");
        table.setAutoFlushTo(false);
        Put put_row1 = new Put(ROW1);
        put_row1.add(COLFAM1, QUAL1, Bytes.toBytes("ROW1_QUAL1_VAL"));
        put_row1.add(COLFAM1, QUAL2, Bytes.toBytes("ROW1_QUAL2_VAL"));
        
        Put put_row2 = new Put(ROW2);
        put_row2.add(COLFAM1, QUAL1, Bytes.toBytes("ROW2_QUAL1_VAL"));
        put_row2.add(COLFAM1, QUAL2, Bytes.toBytes("ROW2_QUAL2_VAL"));
        
        try{
            table.put(put_row1);
            table.put(put_row2);
        }finally{
            table.close();
        }
    }
 
}

以下是過濾器測試程式碼,可以通過修改程式碼,更換過濾器來看到具體的效果:

package com.reyun.hbase;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SkipFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.util.Bytes;
 
public class HBaseScannerTest {
 
    public static void main(String[] args) throws IOException, IllegalAccessException {
        Configuration conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, "testtable");
        table.setAutoFlushTo(false);
        
        Scan scan1 = new Scan();
        SingleColumnValueFilter scvf = new SingleColumnValueFilter(
                Bytes.toBytes("colfam1"), 
                Bytes.toBytes("qual2"), 
                CompareFilter.CompareOp.NOT_EQUAL, 
                new SubstringComparator("BOGUS"));
        scvf.setFilterIfMissing(false);
        scvf.setLatestVersionOnly(true); // OK
        Filter ccf = new ColumnCountGetFilter(2); // OK 如果突然發現一行中的列數超過設定的最大值時,整個掃描操作會停止
        Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1")); // OK 篩選某個(值的條件滿足的)特定的單元格
        Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual2")); // OK 篩選出字首匹配的列
        Filter fkof = new FirstKeyOnlyFilter(); // OK 篩選出第一個每個第一個單元格
        Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了掃描的上限在結果之內
        Filter rrf = new RandomRowFilter((float) 0.8); // OK 隨機選出一部分的行
        Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空
        Filter pf = new PrefixFilter(Bytes.toBytes("row")); // OK  篩選匹配行鍵的字首成功的行
        Filter rf = new RowFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("row1"))); // OK 篩選出匹配的所有的行
        Filter wmf = new WhileMatchFilter(rf); // OK 類似於Python itertools中的takewhile
        Filter skf = new SkipFilter(vf); // OK 發現某一行中的一列需要過濾時,整個行就會被過濾掉
        
        List<Filter> filters = new ArrayList<Filter>();
        filters.add(rf);
        filters.add(vf);
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters); // OK 綜合使用多個過濾器, AND 和 OR 兩種關係
        
        scan1.
        setStartRow(Bytes.toBytes("row1")).
        setStopRow(Bytes.toBytes("row3")).
        setFilter(scvf); 
        ResultScanner scanner1 = table.getScanner(scan1);
        
        for(Result res : scanner1){
            for(Cell cell : res.rawCells()){
                System.out.println("KV: " + cell + ", Value: " + Bytes.toString(CellUtil.cloneValue(cell)));
            }
            System.out.println("------------------------------------------------------------");
        }
        
        scanner1.close();
        table.close();
    }
 
}