1. 程式人生 > >大資料儲存---HBase常用介紹(中)

大資料儲存---HBase常用介紹(中)

我們這裡主要介紹HBase的API

  • 基礎API
  • 封裝工具類

基礎API

  • 建立表
  • 新增資料
  • 查詢資料的三種方式
    • 掃描查詢
    • get方式執行查詢
    • 過濾查詢 PS:刪除表請通過shell命令進入客戶端刪除。
package com.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class HBaseProcess {


    //建立表
    @Test
    public void creatHbase() throws Exception {

        //建立一個連線
        Configuration entries = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection();

        ////建立一個表需要的管理許可權,他來建立表
        Admin admin = connection.getAdmin();
        //建立表名
        TableName tableName = TableName.valueOf("user");

        if (!admin.tableExists(tableName)) {

            //建立一個表
            HTableDescriptor hTable = new HTableDescriptor(tableName);

            //新增列簇
            hTable.addFamily(new HColumnDescriptor("base_info"));
            System.out.println("------------開始建立表------------");
            //建立表
            admin.createTable(hTable);
        }


        //新增資料
        System.out.println("-----------------開始新增資料--------------------");
        //獲取表
        Table user = connection.getTable(TableName.valueOf("user"));

        //使用Put類
        byte[] rewkey_10s = Bytes.toBytes("rewkey_10");
        Put put = new Put(rewkey_10s);

        byte[] family = Bytes.toBytes("base_info");
        byte[] nameField = Bytes.toBytes("username");
        byte[] nameValue = Bytes.toBytes("zhangsan");
        put.addColumn(family,nameField,nameValue);

        byte[] sexField = Bytes.toBytes("sex");
        byte[] sexValue = Bytes.toBytes("1");
        put.addColumn(family, sexField, sexValue);

        byte[] birField = Bytes.toBytes("birthday");
        byte[] birValue = Bytes.toBytes("2014-07-10");
        put.addColumn(family, birField, birValue);

        byte[] addrField = Bytes.toBytes("address");
        byte[] addrValue = Bytes.toBytes("北京市");
        put.addColumn(family, addrField, addrValue);

        user.put(put);


        //獲取資料的三種方式
        //方式一,
        Table user1 = connection.getTable(TableName.valueOf("user"));

        Get get = new Get(Bytes.toBytes("rewkey_10"));

        Result result = user1.get(get);

        List<Cell> cellList = result.listCells();
        for (Cell cell : cellList) {
            System.out.println(Bytes.toString(CellUtil.cloneRow(cell))
                    + "==> " + Bytes.toString(CellUtil.cloneFamily(cell))
                    + "{" + Bytes.toString(CellUtil.cloneQualifier(cell))
                    + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}");
        }

        user1.close();

        //獲取方式2,全表掃描
        System.out.println("-----------掃描全表的資料-------------");
        Scan scan = new Scan();
        /**
         * 新增資料篩選的範圍
         */
        scan.setStartRow(Bytes.toBytes("rowkey_10"));
        scan.setStopRow(Bytes.toBytes("rowkey_22"));

        Table user2 = connection.getTable(TableName.valueOf("user"));
        ResultScanner scanner = user2.getScanner(scan);
        Result result1 = null;
        while ((result1 = scanner.next())!=null){
            List<Cell> cells1 = result1.listCells();
            for (Cell cell : cells1) {
                // 列簇、列名、值、rowkey
                // 列印rowkey,family,qualifier,value
                System.out.println(Bytes.toString(CellUtil.cloneRow(cell))
                        + "==> " + Bytes.toString(CellUtil.cloneFamily(cell))
                        + "{" + Bytes.toString(CellUtil.cloneQualifier(cell))
                        + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}");
            }
        }
        user1.close();

        System.out.println("-------------------查詢住在北京的使用者----------------");

        //定義好過濾器
        ValueFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("北京市"));

        Scan scan1 = new Scan();
        scan1.setFilter(filter);

        //定義好過濾器可以使用了
        Table user3 = connection.getTable(TableName.valueOf("user"));
        ResultScanner scanner1 = user3.getScanner(scan1);
        Result result2=null;

        while ((result2=scanner1.next())!=null){
            List<Cell> cells = result2.listCells();
            for (Cell cell : cells) {
                System.out.println(Bytes.toString(CellUtil.cloneRow(cell))
                        + "==> " + Bytes.toString(CellUtil.cloneFamily(cell))
                        + "{" + Bytes.toString(CellUtil.cloneQualifier(cell))
                        + ":" + Bytes.toString(CellUtil.cloneValue(cell)) + "}");
            }
        }
    }
}

抽取工具類

package com.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;


public class HBaseUtils {

    private static Connection connection;



    //建立表
    public static void createTable(String tName, String... familyNames) throws Exception {
        Connection connection = getConnection();
        Admin admin = connection.getAdmin();
        //建立表名
        TableName tableName = TableName.valueOf(tName);

        if (!admin.tableExists(tableName)) {
            //設定表明
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            for (String familyName : familyNames) {
                //設定列族
                hTableDescriptor.addFamily(new HColumnDescriptor(familyName));
            }
            admin.createTable(hTableDescriptor);
        }
        admin.close();
    }


    //新增資料
    private static void putData(String tableName,String rowkey,String baseInfo,String Field,String Value) throws Exception {

        Table user = getConnection().getTable(TableName.valueOf(tableName));

        //使用Put類
        byte[] rewkey_10s = Bytes.toBytes(rowkey);
        Put put = new Put(rewkey_10s);

        byte[] family = Bytes.toBytes(baseInfo);
        byte[] nameField = Bytes.toBytes(Field);
        byte[] nameValue = Bytes.toBytes(Value);
        put.addColumn(family,nameField,nameValue);


        user.put(put);
        user.close();
    }

    //查詢資料,全表掃描獲取資料,這裡可以設定過濾器
    public static ArrayList<ArrayList<Map<String, String>>> scan
          (String tName, String startKey, String endKey, Filter filter) throws Exception {


        Table table = getConnection().getTable(TableName.valueOf(tName));
        Scan scan = new Scan();
        if (startKey != null && endKey != null) {
            scan.setStartRow(Bytes.toBytes(startKey));
            scan.setStopRow(Bytes.toBytes(endKey));
        }
        if (filter != null) {
            scan.setFilter(filter);
        }
        Result result = null;
        ResultScanner scanner = table.getScanner(scan);
        ArrayList<ArrayList<Map<String, String>>> arrayLists = new ArrayList<ArrayList<Map<String, String>>>();
        while ((result = scanner.next()) != null) {
            ArrayList<Map<String, String>> value = getValue(result);
            arrayLists.add(value);
        }
        return arrayLists;
    }

    //get方式獲取資料
    public static ArrayList<Map<String, String>> get(String tName, String rowkey, String cf, String field) throws Exception {

        // user,rowkey,cf:username:value
        Table table = getConnection().getTable(TableName.valueOf(tName));
        Get get = new Get(Bytes.toBytes(rowkey));
        if (cf != null) {
            get.addFamily(Bytes.toBytes(cf));
        }
        if (field != null) {
            get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(field));
        }
        Result result = table.get(get);
        return getValue( result);
    }

    //裝載獲取到的資料的方法
    private static ArrayList<Map<String, String>> getValue( Result result) {
        ArrayList<Map<String, String>> maps = new ArrayList<Map<String, String>>();
        List<Cell> cells = result.listCells();
        for (Cell cell : cells) {
            HashMap<String, String> hashMap = new HashMap<String, String>();
            hashMap.put("RowKey", Bytes.toString(CellUtil.cloneRow(cell)));
            hashMap.put("Family", Bytes.toString(CellUtil.cloneFamily(cell)));
            hashMap.put("Field", Bytes.toString(CellUtil.cloneQualifier(cell)));
            hashMap.put("Value", Bytes.toString(CellUtil.cloneValue(cell)));
            maps.add(hashMap);
        }
        return maps;
    }

    //單例模式獲取連線
    private static  Connection getConnection() throws Exception {

        if (connection==null){

            Configuration entries = HBaseConfiguration.create();
            //設定最大的連線數量
            connection = ConnectionFactory.createConnection(entries,Executors.newFixedThreadPool(30));
        }
        return connection;
    }
}