1. 程式人生 > >【菜鳥系列】hbase(java)介面--基於hbase1.1.2

【菜鳥系列】hbase(java)介面--基於hbase1.1.2

先講解下主要的介面類

HBaseConfiguration

org.apache.hadoop.hbase.HBaseConfiguration
Adds HBase configuration files to a Configuration

我們一般通過來獲取configuration ,然後在set一些引數,比如zk的地址,埠,是否啟用kerberos認證等

Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","bd36,bd37,bd38,bd66,bd67"
);
configuration.set("hbase.zookeeper.property.clientPort","2181"); configuration.set("zookeeper.znode.parent", "/hbase-unsecure");

Connection

org.apache.hadoop.hbase.client.Connection
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper. Connections are instantiated through the ConnectionFactory class. The lifecycle of the connection is managed by the caller, who has to close() the connection to release the resources.
The connection object contains logic to find the master, locate regions out on the cluster, keeps a cache of locations and then knows how to re-calibrate after they move. The individual connections to servers, meta cache, zookeeper connection, etc are all shared by the Table and Admin instances obtained from this connection.
Connection creation is a heavy-weight operation. Connection implementations are thread-safe, so that the client can create a connection once, and share it with different threads. Table and Admin instances, on the other hand, are light-weight and are not thread-safe. Typically, a single connection per client application is instantiated and every thread will obtain its own Table instance. Caching or pooling of Table and Admin is not recommended.

官網給的太複雜,總結起來一句話:用來獲取和hbase的連線,這裡同樣採用了工廠模式ConnectionFactory

connection = ConnectionFactory.createConnection(configuration);

Admin

org.apache.hadoop.hbase.client.Admin
Admin can be used to create, drop, list, enable and disable and otherwise modify tables, as well as perform other administrative operations.
Since:0.99.0

這個類主要用來建立表,刪除表,啟用禁用表等操作的介面類,hbase之前有個過期的方法叫HBaseAdmin,推薦用最新的,我們該如何獲取Admin類呢?

Admin admin = connection.getAdmin();

TableName

org.apache.hadoop.hbase.TableName

這個類就是描述表名稱的介面類,也就是把我們的字串(表名)轉換為hbase認識的樣子

TableName tname = TableName.valueOf(tablename);

HTableDescriptor

org.apache.hadoop.hbase.HTableDescriptor
HTableDescriptor contains the details about an HBase table such as the descriptors of all the column families, is the table a catalog table, hbase:meta , if the table is read only, the maximum size of the memstore, when the region split should occur, coprocessors associated with it etc…
但是這個要過期了
Deprecated.
As of release 2.0.0, this will be removed in HBase 3.0.0. Use TableDescriptorBuilder to build HTableDescriptor.

這個是表描述資訊的介面類

HTableDescriptor tDescriptor = new HTableDescriptor(tname);

HColumnDescriptor

org.apache.hadoop.hbase.HColumnDescriptor
An HColumnDescriptor contains information about a column family such as the number of versions, compression settings, etc. It is used as input when creating a table or adding a column.

這個是列簇的描述資訊類,比如版本,壓縮方式,新增一個列的時候會使用

HColumnDescriptor famliy = new HColumnDescriptor(cf);

Put

org.apache.hadoop.hbase.client.Put
Used to perform Put operations for a single row.
To perform a Put, instantiate a Put object with the row to insert to, and for each column to be inserted, execute add or add if setting the timestamp.

新增資料的時候,可以選擇批量新增,還是單條新增,如果是批量新增需要建立一個List,將Put物件放入

Table table = connection.getTable(tableName);
List<Put> batPut = new ArrayList<Put>();
Put put = new Put(Bytes.toBytes("rowkey_"+i));  //插入的rowkey
put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("username"), Bytes.toBytes("un_"+i)); //列簇,列,值
batPut.add(put)
table.put(batPut)

Get

org.apache.hadoop.hbase.client.Get
Used to perform Get operations on a single row.
To get everything for a row, instantiate a Get object with the row to get. To further narrow the scope of what to Get, use the methods below.
To get all columns from specific families, execute addFamily for each family to retrieve.
To get specific columns, execute addColumn for each column to retrieve.
To only retrieve columns within a specific range of version timestamps, execute setTimeRange.
To only retrieve columns with a specific timestamp, execute setTimestamp.
To limit the number of versions of each column to be returned, execute setMaxVersions.
To add a filter, call setFilter.

Get用於封裝我們的請求引數,如rowkey,過濾器等

List<Get> gets = new ArrayList<Get>(); //批量封裝請求資訊
Get get = new Get(Bytes.toBytes("rowkey_"+i)); //查詢的rowkey
gets.add(get);
Result[] results = table.get(gets);  //通過Result[]接收資料

Result

org.apache.hadoop.hbase.client.Result
Single row result of a Get or Scan query.
This class is NOT THREAD SAFE.
Convenience methods are available that return various Map structures and values directly.
To get a complete mapping of all cells in the Result, which can include multiple families and multiple versions, use getMap().
To get a mapping of each family to its columns (qualifiers and values), including only the latest version of each, use getNoVersionMap(). To get a mapping of qualifiers to latest values for an individual family use getFamilyMap(byte[]).
To get the latest value for a specific family and qualifier use getValue(byte[], byte[]). A Result is backed by an array of Cell objects, each representing an HBase cell defined by the row, family, qualifier, timestamp, and value.
The underlying Cell objects can be accessed through the method listCells(). This will create a List from the internal Cell []. Better is to exploit the fact that a new Result instance is a primed CellScanner; just call advance() and current() to iterate over Cells as you would any CellScanner. Call cellScanner() to reset should you need to iterate the same Result over again (CellScanners are one-shot). If you need to overwrite a Result with another Result instance – as in the old ‘mapred’ RecordReader next invocations – then create an empty Result with the null constructor and in then use copyFrom(Result)

非執行緒安全的類,用於封裝hbase返回的結果集

Result[] results = table.get(gets);

CellScanner

org.apache.hadoop.hbase.CellScanner

while(cellScanner.advance()){
                Cell cell = cellScanner.current();
                //從單元格cell中把資料獲取並輸出
                //使用 CellUtil工具類,從cell中把資料獲取出來
                String famliy = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualify = Bytes.toString(CellUtil.cloneQualifier(cell));
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("rowkey:"+rowkey+",columnfamily:"+famliy+",qualify:"+qualify+",value:"+value);
            }
 }

Cell

org.apache.hadoop.hbase.Cell
The unit of storage in HBase consisting of the following fields:
1) row
2) column family
3) column qualifier
4) timestamp
5) type
6) MVCC version
7) value

就是結果集的最小單元。

下面是完整的程式碼

package com.jiangtao.asiainfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.NavigableMap;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * hbase操作 建立表
 * 1.通過HBaseConfiguration.create()  :獲取配置 conf
 * 2.conf.set() :設定zk等引數(kerberos認證等)
 * 3.ConnectionFactory.createConnection(configuration)  :獲取連線conn
 * 4.通過conn.getAdmin()來獲取Admin  :表相關操作的類 (HBaseAdmin已過期)
 * 5.建立TableName:描述表名稱的 : TableName tname = TableName.valueOf(tablename);
 * 6.建立表描述資訊類: HTableDescriptor tDescriptor = new HTableDescriptor(tname);
 * 7.新增表列簇描述資訊類:HColumnDescriptor famliy = new HColumnDescriptor(cf);
 * 8.將表列簇描述資訊類新增到表描述資訊類:tDescriptor.addFamily(famliy);
 * 9.呼叫admin建立表:admin.createTable(tDescriptor);
 * 
 * hbase操作 新增資料
 * 1.通過HBaseConfiguration.create()  :獲取配置 conf
 * 2.conf.set() :設定zk等引數(kerberos認證等)
 * 3.ConnectionFactory.createConnection(configuration)  :獲取連線conn
 * 4.建立TableName:描述表名稱的 : TableName tname = TableName.valueOf(tablename);
 * 5.通過conn連接獲得表 物件 :Table table = connection.getTable(tableName);
 * 6.1.單挑插入table.put(Put)
 * 6.2.批量插入資料,先用list封裝put物件:List<Put> batPut = new ArrayList<Put>();
 *          Put put = new Put(Bytes.toBytes("rowkey_"+i));  //插入的rowkey
 *          put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("username"), Bytes.toBytes("un_"+i)); //列簇,列,值
 *          batPut.add(put)
 *          table.put(batPut)
 *          
 *          
 * hbase操作 獲取資料
 * 1.通過HBaseConfiguration.create()  :獲取配置 conf
 * 2.conf.set() :設定zk等引數(kerberos認證等)
 * 3.ConnectionFactory.createConnection(configuration)  :獲取連線conn
 * 4.建立TableName:描述表名稱的 : TableName tname = TableName.valueOf(tablename);
 * 5.通過conn連接獲得表 物件 :Table table = connection.getTable(tableName);
 * 6.List<Get> gets = new ArrayList<Get>(); //批量封裝請求資訊
 *      Get get = new Get(Bytes.toBytes("rowkey_"+i)); //查詢的rowkey
 *      gets.add(get);
 * 7.Result[] results = table.get(gets);  //通過Result[]接收資料
 * 8.使用CellScanner cellScanner = result.cellScanner(); 獲取cell
 * while(cellScanner.advance()){
                Cell cell = cellScanner.current();
                //從單元格cell中把資料獲取並輸出
                //使用 CellUtil工具類,從cell中把資料獲取出來
                String famliy = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualify = Bytes.toString(CellUtil.cloneQualifier(cell));
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("rowkey:"+rowkey+",columnfamily:"+famliy+",qualify:"+qualify+",value:"+value);
            }
 * @author jiangtao
 *
 */
public class HbaseTest {
    public Connection connection;
    //用hbaseconfiguration初始化配置資訊時會自動載入當前應用classpath下的hbase-site.xml
    public static Configuration configuration = HBaseConfiguration.create();
    public Table table;
    public Admin admin;
    public HBaseAdmin ad;

    public HbaseTest() throws Exception{
        //ad = new HBaseAdmin(configuration); //過期了,推薦使用Admin
        configuration.set("hbase.zookeeper.quorum","bd36,bd37,bd38,bd66,bd67");
        configuration.set("hbase.zookeeper.property.clientPort","2181");
        configuration.set("zookeeper.znode.parent", "/hbase-unsecure");
        //對connection初始化
        connection = ConnectionFactory.createConnection(configuration);
        admin = connection.getAdmin();
    }
    //建立表
    public void createTable(String tablename,String... cf1) throws Exception{
        //獲取admin物件
        Admin admin = connection.getAdmin();
        //建立tablename物件描述表的名稱資訊
        TableName tname = TableName.valueOf(tablename);//bd17:mytable
        //建立HTableDescriptor物件,描述表資訊
        HTableDescriptor tDescriptor = new HTableDescriptor(tname);
        //判斷是否表已存在
        if(admin.tableExists(tname)){
            System.out.println("表"+tablename+"已存在");
            return;
        }
        //新增表列簇資訊
        for(String cf:cf1){
            HColumnDescriptor famliy = new HColumnDescriptor(cf);
            tDescriptor.addFamily(famliy);
        }
        //呼叫admin的createtable方法建立表
        admin.createTable(tDescriptor);
        System.out.println("表"+tablename+"建立成功");
    }
    //刪除表
    public void deleteTable(String tablename) throws Exception{
        Admin admin = connection.getAdmin();
        TableName tName = TableName.valueOf(tablename);
        if(admin.tableExists(tName)){
            admin.disableTable(tName);
            admin.deleteTable(tName);
            System.out.println("刪除表"+tablename+"成功!");
        }else{
            System.out.println("表"+tablename+"不存在。");
        }
    }
    //新增資料到表裡面Put
    public void putData(String table_name) throws Exception{
        TableName tableName = TableName.valueOf(table_name);
        Table table = connection.getTable(tableName);
        Random random = new Random();
        List<Put> batPut = new ArrayList<Put>();
        for(int i=0;i<10;i++){
            //構建put的引數是rowkey   rowkey_i (Bytes工具類,各種java基礎資料型別和位元組陣列之間的相互轉換)
            Put put = new Put(Bytes.toBytes("rowkey_"+i));
            put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("username"), Bytes.toBytes("un_"+i));
            put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("age"), Bytes.toBytes(random.nextInt(50)+1));
            put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("birthday"), Bytes.toBytes("20170"+i+"01"));
            put.addColumn(Bytes.toBytes("j"), Bytes.toBytes("phone"), Bytes.toBytes("電話_"+i));
            put.addColumn(Bytes.toBytes("j"), Bytes.toBytes("email"), Bytes.toBytes("email_"+i));
            //單記錄put
//            table.put(put);
            batPut.add(put);
        }
        table.put(batPut);
        System.out.println("表插入資料成功!");
    }
    public void getData(String table_Name) throws Exception{
        TableName tableName = TableName.valueOf(table_Name);
        table = connection.getTable(tableName);
        //構建get物件
        List<Get> gets = new ArrayList<Get>();
        for(int i=0;i<5;i++){
            Get get = new Get(Bytes.toBytes("rowkey_"+i));
            gets.add(get);
        }
        Result[] results = table.get(gets);
        for(Result result:results){
            //一行一行讀取資料
//            NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> maps = result.getMap();
//            for(byte[] cf:maps.keySet()){
//                NavigableMap<byte[],NavigableMap<Long,byte[]>> valueWithColumnQualify = maps.get(cf);
//                for(byte[] columnQualify:valueWithColumnQualify.keySet()){
//                    NavigableMap<Long,byte[]> valueWithTimeStamp = valueWithColumnQualify.get(columnQualify);
//                    for(Long ts:valueWithTimeStamp.keySet()){
//                        byte[] value = valueWithTimeStamp.get(ts);
//                        System.out.println("rowkey:"+Bytes.toString(result.getRow())+",columnFamliy:"+
//                                Bytes.toString(cf)+",comlumnQualify:"+Bytes.toString(columnQualify)+",timestamp:"
//                                +new Date(ts)+",value:"+Bytes.toString(value)
//                                );
//                    }
//                }
//            }

            //使用欄位名稱和列簇名稱來獲取value值
//            System.out.println("rowkey:"+Bytes.toString(result.getRow())+",columnfamily:i,columnqualify:username,value:"+
//                    Bytes.toString(result.getValue(Bytes.toBytes("i"), Bytes.toBytes("username")))
//                    );
//            System.out.println("rowkey:"+Bytes.toString(result.getRow())+",columnfamily:i,columnqualify:age,value:"+
//                    Bytes.toInt(result.getValue(Bytes.toBytes("i"), Bytes.toBytes("age")))
//                    );

            //使用cell獲取result裡面的資料
            CellScanner cellScanner = result.cellScanner();
            while(cellScanner.advance()){
                Cell cell = cellScanner.current();
                //從單元格cell中把資料獲取並輸出
                //使用 CellUtil工具類,從cell中把資料獲取出來
                String famliy = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualify = Bytes.toString(CellUtil.cloneQualifier(cell));
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("rowkey:"+rowkey+",columnfamily:"+famliy+",qualify:"+qualify+",value:"+value);
            }
        }
    }
    //關閉連線
    public void cleanUp() throws Exception{
        connection.close();
    }
    public static void main(String[] args) throws Exception {
        HbaseTest hbaseTest = new HbaseTest();
        hbaseTest.createTable("jiangtao:test", "i","j");
        hbaseTest.putData("jiangtao:test");
        hbaseTest.getData("jiangtao:test");
        hbaseTest.cleanUp();
    }
}

博文原創,尊重成果!感謝