1. 程式人生 > >hbase命令實踐與java api程式碼實踐

hbase命令實踐與java api程式碼實踐

名稱空間(namespace): 預設為default名稱空間
list_namespace
create_namespace ‘my_ns’
create ‘my_ns:my_table’,’fam’
exists ‘my_ns:my_table’
list
list_namespace
disable ‘my_ns:my_table’
drop ‘my_ns:my_table’
drop_namespace ‘my_ns’

rowkey與family: rowkey行鍵唯一,family列簇,每一個數據項都有時間戳
put ‘my_ns:my_table’,’rowkey1’,’fam:f1’,’r1f1’
put ‘my_ns:my_table’,’rowkey1’,’fam:f2’,’r1f2’
put ‘my_ns:my_table’,’rowkey1’,’fam:f3’,’r1f3’
put ‘my_ns:my_table’,’rowkey2’,’fam:f1’,’r2f1’
put ‘my_ns:my_table’,’rowkey2’,’fam:f2’,’r2f2’
put ‘my_ns:my_table’,’rowkey2’,’fam:f3’,’r2f3’
put ‘my_ns:my_table’,’rowkey3’,’fam:f1’,’r3f1’
put ‘my_ns:my_table’,’rowkey3’,’fam:f2’,’r3f2’
put ‘my_ns:my_table’,’rowkey3’,’fam:f3’,’r3f3’
list ‘my_ns:my_table’
scan ‘my_ns:my_table’
get ‘my_ns:my_table’,’rowkey1’
get ‘my_ns:my_table’,’rowkey1’,’fam’
get ‘my_ns:my_table’,’rowkey1’,’fam:f1’
delete ‘my_ns:my_table’,’rowkey1’,’fam:f1’
delete ‘my_ns:my_table’,’rowkey2’,’fam:f2’
delete ‘my_ns:my_table’,’rowkey3’,’fam:f3’
scan ‘my_ns:my_table’

hbase java api:

步驟一:關閉hbase機器上的防火牆,確保在本地能與遠端hbase建立tcp連線:service iptable stop

步驟二: maven專案中新增依賴

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.0</version>
</dependency>

注意:有個小問題,jdk1.6的tools.jar自動下載不到,得自己手動去下個包把依賴寫上,hbase-client裡面依賴了這個包,如下

<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.6</version>
    <scope>system</scope>
    <systemPath>E:/downloads/jar/tools.jar</systemPath>
</dependency>

步驟三:本地機器需要加上hbase伺服器hostname的ip對映,我在hosts檔案裡添加了如下一行:
192.168.137.10 centos001

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseClient {

    private Configuration configuration;
    private Connection connection;
    private Admin admin;

    private static HBaseClient instance;

    private HBaseClient() throws IOException{
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", 
                "192.168.137.10:2181,192.168.137.10:2182,192.168.137.10:2183");
        connection = ConnectionFactory.createConnection(configuration);
        admin = connection.getAdmin();
    };

    public static HBaseClient getInstance() throws IOException{
        if(instance==null){
            synchronized (HBaseClient.class) {
                if(instance==null){
                    instance = new HBaseClient();
                }
            }
        }
        return instance;
    }

    public void close() throws IOException{
        if(admin!=null){
            admin.close();
        }
        if(connection!=null){
            connection.close();
        }
    }

    /**
     * 判斷名稱空間是否存在
     * @param strNamespace 名稱空間
     * @return true-存在,false-不存在
     * @throws IOException
     */
    public boolean isExistsNamespace(String strNamespace) throws IOException{
        NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors();
        for(int i=0; i<namespaces.length; i++){
            if(strNamespace.equals(namespaces[i].getName())){
                return true;
            }
        }
        return false;
    }

    /**
     * 建立名稱空間
     * @param strNamespace 名稱空間
     * @return true-建立成功,false-存在該namespace
     * @throws IOException
     */
    public boolean createNamespace(String strNamespace) throws IOException{
        if(isExistsNamespace(strNamespace)){
            return false;
        }else{
            admin.createNamespace(NamespaceDescriptor.create(strNamespace).build());
            return true;
        }
    }

    /**
     * 判斷表是否存在
     * @param strTableName 表名
     * @return true-存在,false-不存在
     * @throws IOException
     */
    public boolean isExistsTable(String strTableName) throws IOException{
        TableName tableName = TableName.valueOf(strTableName);
        return admin.tableExists(tableName);
    }

    /**
     * 建立表,存在同名表時不刪除同名表也不新建表
     * @param strTableName 表名
     * @param strFamily 列簇名
     * @return true-成功建立,false-已存在同名表,未新建表
     * @throws IOException
     */
    public boolean createTable(String strTableName, String strFamily) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        if (admin.tableExists(tableName)) {
            return false;
        }
        HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor family = new HColumnDescriptor(strFamily);
        hTableDescriptor.addFamily(family);
        admin.createTable(hTableDescriptor);
        return true;
    }

    /**
     * 建立表,存在同名表時刪除同名表然後新建表
     * @param strTableName 表名
     * @param strFamily 列簇名
     * @return 表建立完成後返回true
     * @throws IOException
     */
    public boolean createTableForced(String strTableName, String strFamily) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        if (admin.tableExists(tableName)) {
            deleteTable(strTableName);
        }
        HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor family = new HColumnDescriptor(strFamily);
        hTableDescriptor.addFamily(family);
        admin.createTable(hTableDescriptor);
        return true;
    }

    /**
     * 插入資料
     * @param strTableName 表名
     * @param put 插入資料物件
     * @throws IOException
     */
    public void insertData(String strTableName, Put put) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        table.put(put);
    }

    /**
     * 插入資料
     * @param strTableName 表名
     * @param putList 插入資料物件集合
     * @throws IOException
     */
    public void insertData(String strTableName, List<Put> putList) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        table.put(putList);
    }

    /**
     * 掃描表
     * @param strTableName 表名
     * @return ResultScanner 掃描結果物件
     * @throws IOException
     */
    public ResultScanner queryTable(String strTableName) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        ResultScanner scanner = table.getScanner(new Scan());
        return scanner;
    }

    /**
     * 獲取指定行的資料
     * @param strTableName 表名
     * @param get 查詢條件物件
     * @return 查詢結果物件
     * @throws IOException
     */
    public Result queryTableByRowKey(String strTableName, Get get) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        Result result = table.get(get);
        return result;
    }

    /**
     * 根據Filter條件物件掃描表
     * @param strTableName 表名
     * @param filter Filter條件物件
     * @return 掃描結果物件
     * @throws IOException
     */
    public ResultScanner queryTableByFilter(String strTableName, Filter filter) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        Scan scan = new Scan();
        scan.setFilter(filter);
        ResultScanner scanner = table.getScanner(scan);
        return scanner;
    }

    /**
     * 根據Filter條件物件列表掃描表
     * @param strTableName 表名
     * @param filters Filter條件物件集合
     * @return 掃描結果集合
     * @throws IOException
     */
    public ResultScanner queryTableByFilters(String strTableName, List<Filter> filters) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        FilterList filterList = new FilterList(filters);
        Scan scan = new Scan();
        scan.setFilter(filterList);
        ResultScanner scanner = table.getScanner(scan);
        return scanner;
    }

    /**
     * 新增列
     * @param strTableName 表名
     * @param strColumn 列簇名
     * @throws IOException
     */
    public void addColumn(String strTableName, String strColumn) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        HColumnDescriptor columnDescriptor = new HColumnDescriptor(strColumn);
        admin.addColumn(tableName, columnDescriptor);
    }

    /**
     * 刪除列
     * @param strTableName 表名
     * @param strColumn 列簇名
     * @throws IOException
     */
    public void deleteColumn(String strTableName, String strColumn) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        admin.deleteColumn(tableName, strColumn.getBytes());
    }

    /**
     * 根據rowkey刪除行
     * @param strTableName 表名
     * @param rowkey 行名
     * @throws IOException
     */
    public void deleteByRowKey(String strTableName, String rowkey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        table.delete(delete);
    }

    /**
     * 刪除行
     * @param strTableName 表名
     * @param list 刪除資料集合
     * @throws IOException
     */
    public void deleteRow(String strTableName, List<Delete> list) throws IOException {
        Table table = connection.getTable(TableName.valueOf(strTableName));
        table.delete(list);
    }

    /**
     * 根據Filter條件物件刪除行
     * @param strTableName 表名
     * @param filter Filter條件物件
     * @throws IOException
     */
    public void deleteByFilter(String strTableName, Filter filter) throws IOException {
        ResultScanner scanner = queryTableByFilter(strTableName, filter);
        List<Delete> list = new ArrayList<Delete>();
        for (Result result : scanner) {
            Delete delete = new Delete(result.getRow());
            list.add(delete);
        }
        deleteRow(strTableName, list);
        scanner.close();
    }

    /**
     * 截斷表
     * @param strTableName 表名
     * @throws IOException
     */
    public void truncateTable(String strTableName) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        admin.disableTable(tableName);
        admin.truncateTable(tableName, true);
    }

    /**
     * 刪除表
     * @param strTableName 表名
     * @throws IOException
     */
    public void deleteTable(String strTableName) throws IOException {
        TableName tableName = TableName.valueOf(strTableName);
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
    }

    public static void main(String[] args) throws IOException {
        HBaseClient test = HBaseClient.getInstance();
        test.testCreateNamespace();
        test.testCreateTable();
        test.testInsertData();
        test.testQuery();
        test.testQueryByRow();
        test.testQueryByFilter();
        test.testQueryByFilters();
        test.testAddColumn();
        test.testDeleteColumn();
        test.testDeleteRow();
        test.testDeleteTable();
        test.close();
    }

    public void testCreateNamespace() throws IOException{
        createNamespace("ns1");
    }

    public void testCreateTable() throws IOException{
        createTableForced("ns1:t1", "cf");
    }

    public void testInsertData() throws IOException {
        List<Put> putList = new ArrayList<Put>();
        Put put = null;
        for (int i = 0; i < 10; i++) {
            put = new Put(Bytes.toBytes("row" + i));
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c1"), Bytes.toBytes("r" + i + "c1"));
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c2"), Bytes.toBytes("r" + i + "c2"));
            putList.add(put);
        }
        insertData("ns1:t1", putList);
    }

    public void testQuery() throws IOException{
        ResultScanner scanner = queryTable("ns1:t1");
        for (Result result : scanner) {
            byte[] row = result.getRow();
            System.out.println("row key is:" + new String(row));
            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {
                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();
                System.out.println("row value is:" + new String(familyArray)
                        + new String(qualifierArray) + new String(valueArray));
            }
        }
        scanner.close();
    }

    public void testQueryByRow() throws IOException{
        Get get = new Get("row5".getBytes());
        Result result = queryTableByRowKey("ns1:t1", get);
        byte[] row = result.getRow();
        System.out.println("row key is:" + new String(row));
        List<Cell> listCells = result.listCells();
        for (Cell cell : listCells) {
            byte[] familyArray = cell.getFamilyArray();
            byte[] qualifierArray = cell.getQualifierArray();
            byte[] valueArray = cell.getValueArray();
            System.out.println("row value is:" + new String(familyArray)
                    + new String(qualifierArray) + new String(valueArray));
        }
    }

    public void testQueryByFilter() throws IOException{
        Filter filter = new SingleColumnValueFilter(
                Bytes.toBytes("cf"), Bytes.toBytes("c1"),
                CompareOp.EQUAL, Bytes.toBytes("r3c1"));
        ResultScanner scanner = queryTableByFilter("ns1:t1", filter);
        for (Result result : scanner) {
            byte[] row = result.getRow();
            System.out.println("row key is:" + new String(row));
            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {
                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();
                System.out.println("row value is:" + new String(familyArray)
                        + new String(qualifierArray) + new String(valueArray));
            }
        }
        scanner.close();
    }

    public void testQueryByFilters() throws IOException{
        List<Filter> filters = new ArrayList<Filter>();
        Filter filter1 = new SingleColumnValueFilter(
                Bytes.toBytes("cf"), Bytes.toBytes("c1"),
                CompareOp.EQUAL, Bytes.toBytes("r5c1"));
        Filter filter2 = new SingleColumnValueFilter(
                Bytes.toBytes("cf"), Bytes.toBytes("c2"),
                CompareOp.EQUAL, Bytes.toBytes("r5c2"));
        filters.add(filter1);
        filters.add(filter2);
        ResultScanner scanner = queryTableByFilters("ns1:t1", filters);
        for (Result result : scanner) {
            byte[] row = result.getRow();
            System.out.println("row key is:" + new String(row));
            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {
                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();
                System.out.println("row value is:" + new String(familyArray)
                        + new String(qualifierArray, "UTF-8") + new String(valueArray, "UTF-8"));
            }
        }
        scanner.close();
    }

    public void testAddColumn() throws IOException{
        addColumn("ns1:t1", "fam");
        testQueryByRow();
    }

    public void testDeleteColumn() throws IOException{
        deleteColumn("ns1:t1", "fam");
        testQueryByRow();
    }

    public void testDeleteRow() throws IOException{
        List<Delete> list = new ArrayList<Delete>();
        Delete delete = new Delete(Bytes.toBytes("row4"));
        list.add(delete);
        deleteRow("ns1:t1",list);
        testQuery();
    }

    public void testDeleteTable() throws IOException{
        truncateTable("ns1:t1");
        testQuery();
        deleteTable("ns1:t1");
        System.out.println(admin.tableExists(TableName.valueOf("ns1:t1")));
    }

}