1. 程式人生 > >HBase Java API(1.2.X)使用簡介

HBase Java API(1.2.X)使用簡介

之前讀《HBase權威指南》在實踐時,發現API已經發生了一些變化,查閱官方文件,確認HBase的API在1.0版本後已經做了修改。本文介紹在新API下,使用Java訪問HBase的方法。
HBase Client通過查詢hbase:meta表來確定你所感興趣的資料行所在的RegionServers。在定位到這些資料所在的region後,client會直接和這些region所在的RegionServer聯絡進行資料的讀寫,而不會經由master進行資料的讀寫。Client還會快取這些資訊以使隨後的其他請求不需要再次經過這樣的查詢過程。Client只有在感知到因為master觸發了負載均衡導致region重新分配,或是RegionServer掛掉後,才會重新查詢定位請求讀寫的region。

1. 配置Configuration,獲取Connection例項
通過Client操作HBase,需要配置連結的引數,獲取Connection例項,然後從該例項中獲取使用者操作HBase的Table,Admin等物件。Connection例項是重量級的物件,但是它是執行緒安全的,因此client端可以只建立並維護好一個該例項。當所有的操作完成後,client應該負責關閉該例項。Table,Admin這些物件是輕量級的,當需要時就建立它,並在用完後釋放。

Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum"
, "aa.bb.cc.dd");
config.set("hbase.zookeeper.property.clientPort", "2181"); config.set("hbase.rootdir","hdfs://aa.bb.cc.dd:9000/hbase"); config.set("hbase.table.sanity.checks", "false"); Connection conn = ConnectionFactory.createConnection(config);

如上,建立並配置Configuration物件,並將其傳遞給ConnectionFactory,從中獲取Connection例項。Configuration物件引數的配置可以參考hbase-site.xml以及HDFS相關配置。ConnectionFactory是一個”non-instantiable class”,用於管理Connection的建立。

2. namespace
可以通過namespace指定表屬於的名稱空間,建表時如若不指定,則預設放在default。namespace指的是一個表的邏輯分組,可以更好的實現資料的隔離和同一類表的資源管理。

Connection conn = null;
Admin admin = null;
try {
    conn = ConnectionFactory.createConnection(config);
    admin = conn.getAdmin();
    NamespaceDescriptor[] nsList = admin.listNamespaceDescriptors();
    boolean found = false;
    for(NamespaceDescriptor ns:nsList){
        System.out.println("Namespace:" + ns.getName());
        if(ns.getName() != null && ns.getName().equals(nsName)){
            found = true;
            break;
        }
    }
    if(found){
        System.out.println("namespace:" + nsName +" exist, do not need to create");
        return;
    }
    admin.createNamespace(NamespaceDescriptor.create(nsName).build());
    System.out.println("Finish to create namespace");
} catch (Exception e) {

} finally{
    if(null != conn) {try{conn.close();}catch (Exception e) {e.printStackTrace();}}
    if(null != admin) {try{admin.close();}catch (Exception e) {e.printStackTrace();}}
}

上述程式碼展示了,如何查詢表空間,以及建立表空間。在新版本的API中Admin物件代替了原來的HBaseAdmin用於create/drop/list/enable/disable /modify表操作,當然也會執行其他管理性操作(compact/closeRegion等),在這裡使用了admin的createNamespace方法來建立表空間。NamespaceDescriptor用於描述Namespace。

3. 建立/修改表

/*
*nsName:表空間名
*tableName:表名稱
*/
admin = conn.getAdmin();

HTableDescriptor table = new HTableDescriptor(TableName.valueOf(nsName + ":" + tableName));
//如果表已經存在,先disable,再delete,然後重新建立
if(admin.tableExists(table.getTableName())){
    admin.disableTable(table.getTableName());
    admin.deleteTable(table.getTableName());
}
table.setDurability(Durability.SYNC_WAL);
//建立列簇
HColumnDescriptor hcd = new HColumnDescriptor("info-test");
hcd.setCompressTags(false);
hcd.setMaxVersions(3);

table.addFamily(hcd);
admin.createTable(table);
HTableDescriptor,HColumnDescriptor分別使用者描述表和列簇。如需刪除表,需要先將表disableTable而後才能執行deleteTable操作。HColumnDescriptor包含了列簇所含的最大版本個數,壓縮設定等。

//remove column family
admin.disableTable(table.getTableName());
table.remove(Bytes.toBytes("info-test"));
//add
HColumnDescriptor hcd = new HColumnDescriptor("info");
hcd.setMaxVersions(3);
table.addFamily(hcd);
//modify&enable
admin.modifyTable(table.getTableName(), table);
admin.enableTable(table.getTableName());

上述程式碼描述了怎樣刪除原有列簇,新增新的列簇;對於表的修改要堅守先disableTable在修改,最後enableTable的步驟。

4. Put/Get向表中存取資料
Put:向表中存入資料

/*
*nsName:表空間名
*tableName:表名稱
*/
Admin admin = null;
Table table = null;
try {
    admin = conn.getAdmin();

    TableName tName = TableName.valueOf(nsName + ":" + tableName);
    if(!admin.tableExists(tName)){
        return;
    }       
    table = conn.getTable(tName);
    ArrayList<Put> puts = new ArrayList<>();

    Put put = new Put(Bytes.toBytes("row-1"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("LinTao"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("28"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("school"), Bytes.toBytes("NJUPT"));
    puts.add(put);

    put = new Put(Bytes.toBytes("row-2"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("LinLei"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("29"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("school"), Bytes.toBytes("NJU"));
    puts.add(put);

    table.put(puts);
} catch (Exception e) {

} finally{
    if(null != conn) {try{conn.close();}catch (Exception e) {e.printStackTrace();}}
    if(null != admin) {try{admin.close();}catch (Exception e) {e.printStackTrace();}}
    if(null != table) {try{table.close();}catch (Exception e) {e.printStackTrace();}}
}

上面描述了怎樣向指定表空間的表中插入資料。新的Table物件已經替代了原來的HTable物件,用於HBase表通訊,執行get/put/delete/scan data from a table.Put用於建立一行資料,其建構函式的引數即是rowkey,良好的rowkey設計可以極大的提升HBase的存取效能。addColumn用於向指定列簇,限定符中新增列。設定或新增行的其他屬性的方法還有:

add(Cell cell) //Add the specified KeyValue to this Put operation.
addColumn(byte[] family, byte[] qualifier, long ts, byte[] value) //Add the specified column and value, with the specified timestamp as its version to this Put operation.
setTimestamp(long timestamp) //Set the timestamp of the delete.
setTTL(long ttl) //Set the TTL desired for the result of the mutation, in milliseconds.
……

每一個put操作實際上都是一個RPC操作,它將客戶端資料傳送到伺服器然後返回。如果有大量的這種put提交勢必會影響效能。HBase的API為客戶端配置了寫緩衝區,緩衝區收集put操作然後呼叫RPC一次性將多個put送往伺服器。可以設定緩衝區自動flush資料到服務端(table.setAutoFlush(true)),但這個通常不是最好的選擇,預設情況下AutoFlush時關閉的。API會追蹤統計當前緩衝區資料的大小,達到門限後會自動刷寫資料。此外當我們呼叫table.close()時也會觸發一次無條件的刷寫。

Get:獲取資料

table = conn.getTable(tName);
//get
Get get = new Get(Bytes.toBytes("row-1"));
Result result = table.get(get);
for(Cell cell:result.rawCells()){
    System.out.println(
            "RowKey :" + Bytes.toString(result.getRow()) +
            "Familiy:Qualifir :" + Bytes.toString(CellUtil.cloneQualifier(cell))+
            "Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}

獲取資料可以使用Get,可以設定Get的引數使用者獲取指定行,也可以指定列簇名,指定qualifier,指定timestamp等

addColumn(byte[] family, byte[] qualifier)//Get the column from the specific family with the specified qualifier.
addFamily(byte[] family)//Get all columns from the specified family.
setCacheBlocks(boolean cacheBlocks)//Set whether blocks should be cached for this Get.
setTimeRange(long minStamp, long maxStamp)//Get versions of columns only within the specified timestamp range, [minStamp, maxStamp).
setTimestamp(long timestamp)//Get versions of columns with the specified timestamp.

通過指定列簇名和限定符直接獲取結果值:

get.addColumn(Bytes.toBytes("row-1"), Bytes.toBytes("age"));
result = table.get(get);
result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"));

5. delete刪除資料
常用建構函式

Delete(byte[] row) //指定要刪除的行鍵
刪除行鍵指定行的資料。如果沒有進一步的設定,使用該建構函式將刪除行鍵指定的行中 所有列族中所有列的所有版本 !
Delete(byte[] row, long timestamp) //刪除行鍵和時間戳共同確定行的資料
常用方法:
deleteColumn(byte[] family, byte[] qualifier) //刪除指定列的 最新版本 的資料。
deleteColumns(byte[] family, byte[] qualifier) //刪除指定列的 所有版本的資料。
deleteFamily(byte[] family) //刪除指定列族的所有列的 所有 版本資料。
//刪除指定行,列簇中指定qualified的欄位值
table = conn.getTable(tName);
Delete delete = new Delete(Bytes.toBytes("row-1"));
delete.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));
table.delete(delete);

6. Scan:掃描

table = conn.getTable(tName);
//scanner
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("info"));
ResultScanner rs = table.getScanner(scan);
for(Result r = rs.next(); r != null; r =rs.next()){
    for(Cell cell:r.rawCells()){
        System.out.println( CellUtil.cloneRow(cell)
                + " => "+ Bytes.toString(CellUtil.cloneFamily(cell))
                + "," + cell.getTimestamp()
                + ", {" + Bytes.toString(CellUtil.cloneQualifier(cell))
                + ":" + Bytes.toString(CellUtil.cloneValue(cell))+ "}");
    }
}

scan通常意味了消耗更多的效能,我們只展示Scan方法的使用,工程實踐中,應該通過設定過濾器,限定時間範圍,指定起始列,配置快取等要優化。
對ResultScanner的每一次next()呼叫都會為每行資料生成一個單獨的RPC請求,如果想讓一次PRC呼叫獲得多行資料,則需要開啟掃描器快取:

//Set the number of rows for caching that will be passed to scanners.
setCaching(int caching)

setCaching可以控制每次RPC取回的行數,但是如果設定的過大,導致返回給客戶端的資料超出其堆的大小,程式就會丟擲OOM

//Set the maximum number of cells to return for each call to next().
setBatch(int batch)

setBatch用於控制每次取回的列數,因為如果某行資料量非常大,返回的這些行有可能超過客戶端程序的記憶體容量。
綜上快取是面向行一級操作的優化,批量則是面向列一級的操作。