1. 程式人生 > >Hbase Java API簡單實踐(附原始碼解釋)

Hbase Java API簡單實踐(附原始碼解釋)

詳細程式碼及連結

  • maven依賴:hbase-client,slf4j-api,slf4j-nop(不需要hbase-server包)
  • resource中加入hdfs-site.xml配置檔案(不需要core-site.xml)
  • resource中放置log4j.properties檔案(HBase安裝目錄下conf檔案中的log4j.properties)
  • 完整程式碼如下(SomeHbaseAPI類與APITest類)


    SomeHbaseAPI.java
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author 王海
 * @version V1.0
 * @package
per.wanghai * @Description * @Date 2017/10/29 23:23 */
public class SomeHbaseAPI { private final Logger logger = LoggerFactory.getLogger(SomeHbaseAPI.class); protected void listTable(Admin admin) throws IOException { // 獲得HTableDescriptors // (所有namespace的表,相當於scan META) HTableDescriptor[] tableDescriptor = admin.listTables(); System.out.println("您的HBase有以下表:"
); for (int i = 0; i < tableDescriptor.length; i++) { System.out.println("表" + i + ":" + tableDescriptor[i].getNameAsString()); } } /** * @param columnFamilies(這是一個變長引數,“Varargs”機制只允許一個變長引數,且必須放在最後)詳見參考2 * @throws IOException * @Description 該方法建立一個table例項 */ protected void createTable(Admin admin, TableName tableName, String... columnFamilies) throws IOException { try { if (admin.tableExists(tableName)) { // "{}"是slf4j的佔位符(其一大特色) // DEBUG < INFO < WARN < ERROR < FATAL logger.warn("表:{}已經存在!", tableName.getNameAsString()); } else { // 標註2:關於HTableDescriptor: HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); for (String columnFamily : columnFamilies) { tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)); } admin.createTable(tableDescriptor); logger.info("表:{}建立成功!", tableName.getNameAsString()); } } finally { if (admin != null) { admin.close(); } } } /** * @throws IOException * @Description 一行一行的插入資料 * TODO:批量插入可以使用 Table.put(List<Put> list) */ protected void putOneByOne(Connection connection, TableName tableName, byte[] rowKey, String columnFamily, String column, String data) throws IOException { Table table = null; try { // 建立一個table例項 table = connection.getTable(tableName); // HBase中所有的資料最終都被轉化為byte[] // (rowKey已經在testCurd方法中轉換為byte[]) Put p = new Put(rowKey); // 檢視原始碼知:put的add方法已經被棄用 p.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(p); logger.info("表:{}已更新!!!", tableName.getNameAsString()); } finally { if (table != null) { table.close(); } } } /** * @param connection * @param tableName * @param str:一個字串陣列(rowkey,family,qualifier,value.迴圈) * @throws IOException */ protected void putList(Connection connection, TableName tableName, String[] str) throws IOException { // 每個put操作,我們放入四個資料 int count = str.length / 4; // 我們希望資料量是4的倍數,因為剩下的我們將不會寫入 int remainder = str.length % 4; if (remainder > 0) { logger.warn("資料可能並不會像您預期的那樣完全寫入,如有必要,請檢查下您的資料量!"); } Table table = null; try { // 建立一個table例項 table = connection.getTable(tableName); List<Put> puts = new ArrayList<>(); for (int i = 0; i < count; i++) { Put put = new Put(Bytes.toBytes(str[4 * i])); put.addColumn(Bytes.toBytes(str[4 * i + 1]), Bytes.toBytes(str[4 * i + 2]), Bytes.toBytes(str[4 * i + 3])); puts.add(put); } table.put(puts); logger.info("表:{}已使用putList方法更新!!!", tableName.getNameAsString()); } finally { if (table != null) { table.close(); } } } /** * @throws IOException * @Description 掃描表 * 想獲取部分行的資料,與putList方法類似,用List<Get>即可 */ protected void scan(Connection connection, TableName tableName) throws IOException { Table table = null; try { table = connection.getTable(tableName); /* 行的數目很大時,同時在一次請求中傳送大量資料,會佔用大量的系統資源並消耗很長時間, 所以ResultScanner類把掃描操作轉換為類似的get操作,它將每一行資料封裝成一個Result例項, 並將所有的Result例項放入一個迭代器中 */ ResultScanner rsScan1; ResultScanner rsScan2; // 這次操作返回表中所有的資料 Scan scan1 = new Scan(); rsScan1 = table.getScanner(scan1); for (Result r : rsScan1) { System.out.println(r); // 打印出來的Value是bytes型別 } rsScan1.close(); // 注:掃描器也使用同樣的租約超時機制,保護其不被失效的客戶單阻塞太久 // 超時時間配置:hbase.regionserver.lease.period // 同樣,也可以addfamily: Scan scan2 = new Scan(); scan2.addFamily(Bytes.toBytes("commonInfo")); rsScan2 = table.getScanner(scan2); for (Result r : rsScan2) { System.out.println(r); } rsScan2.close(); } finally { if (table != null) { table.close(); } } } /** * @throws IOException * @Description 根據row key獲取表中的該行資料 */ protected void getOneRow(Connection connection, TableName tableName, byte[] rowKey) throws IOException { Table table = null; try { table = connection.getTable(tableName); // 這種方法獲取指定rowkey的所有資訊(然後可以使用不同的方法獲取指定資訊) // 用rowKey來例項化get物件, Get all = new Get(rowKey); // Result類不是執行緒安全的 // 更多的使用方法見標註4 Result result = table.get(all); // 可以使用addColumn指定columnFamily與qualifier // 標註3:更多縮小獲取範圍的方法 /* 這裡使用addFamily獲取指定列族的所有列的資訊(一行) Get part = new Get(rowKey); part.addFamily(Bytes.toBytes("commonInfo")); Result result = table.get(part); ... ... */ /*通過getValue獲取指定資訊 不推薦用字串拼接的方式,字串拼接會不斷的建立新的物件, 而原來的物件就會變為垃圾被GC回收掉,如果拼接得次數多,這樣執行效率會很低底。 (見下方Cell中使用StringBuffer) String city = Bytes.toString(result.getValue(Bytes.toBytes("commonInfo"),Bytes.toBytes("city"))); String age = Bytes.toString(result.getValue(Bytes.toBytes("concelInfo"),Bytes.toBytes("age"))); System.out.println("city: " + city + "\t" + "age: " + age); */ // rawCells()返回cell[];注意:Cell介面中的getFamily、getValue等方法已經被廢棄 // 推薦:使用CellUtil中的一些列方法 for (Cell cell : result.rawCells()) { /* 與上方的String拼接不同,這樣的String拼接不會建立多個String物件 System.out.println( "RowNum : " + "\t" + Bytes.toString(CellUtil.cloneRow(cell)) + ", Family : " + "\t" + Bytes.toString(CellUtil.cloneFamily(cell)) + ", Qualifier : " + "\t" + Bytes.toString(CellUtil.cloneQualifier(cell)) + ", Value : " + "\t" + Bytes.toString(CellUtil.cloneValue(cell)) ); */ // 採用StringBuffer:(因為其是可變的字串物件,所以不會再建立新變數) StringBuffer sbuffer = new StringBuffer() .append("RowNum : \t") .append(Bytes.toString(CellUtil.cloneRow(cell))) .append(", Family : \t") .append(Bytes.toString(CellUtil.cloneFamily(cell))) .append(", Qualifier : \t") .append(Bytes.toString(CellUtil.cloneQualifier(cell))) .append(", Value : \t") .append(Bytes.toString(CellUtil.cloneValue(cell))); System.out.println(sbuffer); } } finally { if (table != null) { table.close(); } } } /** * @throws IOException * @Description 刪除表中的資料 */ protected void myDeleteTable(Admin admin, TableName tableName) throws IOException { try { if (admin.tableExists(tableName)) { // 必須先disable, 再delete myDisableTable(admin, tableName); // admin的很多方法在子類HBaseAdmin中實現 // TODO:沒看出該父類通過何種方式呼叫的子類方法 admin.deleteTable(tableName); logger.info("表:{}已刪除!!!", tableName.getNameAsString()); } else { logger.info("表:{}並不存在!!!", tableName.getNameAsString()); } } finally { if (admin != null) { admin.close(); } } } protected void myDisableTable(Admin admin, TableName tableName) throws IOException { try { // admin的很多方法在子類HBaseAdmin中實現 if (admin.tableExists(tableName)) { admin.disableTable(tableName); logger.info("表:{}已禁用!!!", tableName.getNameAsString()); } } finally { if (admin != null) { admin.close(); } } } }



APITest.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * @author 王海[https://github.com/AtTops]
 * @version V1.0
 * @package PACKAGE_NAME
 * @Description
 * @Date 2017/10/31 11:43
 */
public class APITest {
    public static void main(String[] args) {
        new APITest().testCrud();
    }

    private void testCrud() {
        SomeHbaseAPI caller = new SomeHbaseAPI();
        /*
         * 標註1:詳解HBaseConfiguration
         */
        // 建立一個configuration物件 —— 告訴客戶端必要的配置資訊
        Configuration config = HBaseConfiguration.create();
        // 建立一個連線到叢集的connection
        Connection connection = null;
        /* Admin是一個介面類,其很多方法在子類HBaseAdmin中實現
         0.99版本開始:HBaseAdmin不再是客戶端API。它被標記為InterfaceAudience.Private,
         表示是一個HBase內部類。
         使用Connection.getAdmin()獲取Admin的例項,而不是直接構建一個HBaseAdmin。
         可以用來create、drop、list、enabl、disable表;add、drop 表的column families,以及一些其他的管理操作。*/
        try {
            connection = ConnectionFactory.createConnection(config);
            //
            Admin admin = connection.getAdmin();
            // 該方法傳遞一個String型別引數,返回TableName例項
            TableName tableName = TableName.valueOf("myHBaseTable");
            // 表不存在會報:TableNotFoundException

            // 獲取lists of table
            caller.listTable(admin);
            // 建立HBase表
            caller.createTable(admin, tableName, "commonInfo", "concelInfo");
            // rowkey要設計得儘量的短,資料的持久化檔案HFile中是按照KeyValue儲存的,
            // 如果rowkey過長,會極大影響HFile的儲存效率

            byte[] rowkey_bytes = Bytes.toBytes("ROW4");
            /* 一行一行的插入資料,每一次put操作都是一次有效的RPC(
             所以資料量大時不要這樣使用, 而應該使用BufferedMutator來實現批量的非同步寫操作。)
             這裡兩個列族,commonInfo列族兩個“小列”,concelInfo一個“小列”*/
            caller.putOneByOne(connection, tableName, rowkey_bytes, "commonInfo", "city", "Ziyang");
            caller.putOneByOne(connection, tableName, rowkey_bytes, "commonInfo", "password", "000000");
            caller.putOneByOne(connection, tableName, rowkey_bytes, "concelInfo", "age", "100");

            // 刪除表
//            caller.myDeleteTable(admin, tableName);

            // 獲取指定的資料
            caller.getOneRow(connection, tableName, rowkey_bytes);

            // 批量put資料
            String[] str = new String[]{"ROW5", "commonInfo", "city", "Shanghai", "ROW5"
                    , "concelInfo", "age", "35", "ROW6", "concelInfo", "age", "120", "Illegal_Value"};
            caller.putList(connection, tableName, str);

            // 刪除兩行資料
            Delete delete1 = new Delete(Bytes.toBytes("ROW5"));
            Delete delete = new Delete(Bytes.toBytes("ROW6"));
            /*也可以定義刪除的列族:
            其中addCaddColumn是刪除最新版本,addCaddColumns
            是刪除所有版本*/
            Table table = connection.getTable(tableName);
            table.delete(delete1);
            table.delete(delete);

            // scan表
            caller.scan(connection, tableName);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                // 最後記得關閉叢集
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

注:程式碼不用上傳到叢集執行,類似於JDBC,配置檔案中已經有了各種配置資訊,資料通過網路套接字進行傳輸(確定編寫程式碼的網路與叢集網路能通訊,否則可能會報Will not attempt to authenticate using SASL)

註釋

標註1

  • HBaseConfiguration 繼承了 hadoop.conf.Configuration
  • 該類的HBaseConfiguration()和HBaseConfiguration(Configuration c)
    構造方法已經被棄用,建議使用create方法
    LOG.warn(“instantiating HBaseConfiguration() is deprecated. Please use HBaseConfiguration.create() to construct a plain Configuration”);
  • HBaseConfiguration.create()方法首先呼叫hadoop的Configuration()構造conf物件;然後將該conf物件傳遞給addHbaseResources方法,該方法返回最終的“知道”各種配置資訊的conf物件。

    這裡寫圖片描述

    從這裡我們可知:resource資料夾只需要放入hbase-default.xml和hbase-site.xml這兩個配置檔案即可,然後將resource資料夾新增到classpath
    fjksv
  • *

標註2

  • Hbase2.0.0中,把HTableDescriptor標記為@Deprecated,並會在3.0.0版本被移除
  • HTableDescriptor包含有關HBase表的詳細資訊,例如所有列族的描述符、獲取列族數量、列族名字等等

標註3

要進一步縮小要獲取的範圍,請使用以下方法:

  1. 要從特定列族獲取所有列,請為每個列族執行addFamily進行檢索。
  2. 要獲取特定列(qualifier),請對要檢索的每個列執行addColumn。
  3. 要僅在特定範圍的版本時間戳內檢索列,請執行setTimeRange。
  4. 要僅檢索具有特定時間戳的列,請執行setTimestamp。
  5. 要限制要返回的每列的版本數,請執行setMaxVersions。
  6. 要新增過濾器,請呼叫setFilter。

標註4

Result類(可以直接返回各種Map結構和值)更多的使用方法:

  1. 要獲取Result中所有單元格的完整對映,包括多個系列和多個版本,使用getMap()。
  2. 要獲取每個 family到其列(qualifiers和values)的對映,僅包括每個列的最新版本,請使用getNoVersionMap()。
  3. 要獲得一個個別 family的限定符到最新值的對映,使用getFamilyMap(byte [])。
  4. 要獲取特定family和qualifiers的最新值,使用getValue(byte [],byte [])。返回的結果是Cell物件陣列,每個物件包含row, family, qualifier, timestamp, 和value.
  5. 可以通過方法listCells()訪問底層的Cell物件。這將從內部Cell []建立一個列表。

執行截圖:

建立表,插入三行資料:
這裡寫圖片描述

查看錶(驗證):
這裡寫圖片描述


list:
這裡寫圖片描述


通過getValue獲取指定資訊 & rawCells() 獲取指定行所有資訊
這裡寫圖片描述

這裡寫圖片描述


刪除與禁用表:
這裡寫圖片描述


Scan全表 & Scan特定列
這裡寫圖片描述


putList方法一次put多條資料:
這裡寫圖片描述

報錯及解決過程

1.Failed to load class “org.slf4j.impl.StaticLoggerBinder
在類路徑上放置一個(只有一個)slf4j-nop.jar,slf4j-simple.jar,slf4j-log4j12.jar,slf4j-jdk14.jar或logback-classic.jar可以解決問題(直接在maven中加入)
2.WARN No appenders could be found for logger (org.apache.hadoop.security.Groups).
沒有配置檔案log4j.xml或者log4j.properties,亦或者路徑不對
3.Will not attempt to authenticate using SASL
網路ping不通的原因
4.警告:java.io.IOException: No FileSystem for scheme: hdfs
試過新增core-site.xml和hdfs-site.xml,無效但是並不影響使用)

參考資源