1. 程式人生 > >hbase api常用方法使用及預分割槽解決熱點問題

hbase api常用方法使用及預分割槽解決熱點問題

API 操作:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kktest.hbase.HashChoreWoker;
import com.kktest.hbase.HashRowKeyGenerator;
import com.kktest.hbase.RowKeyGenerator;
import com.kktest.hbase.BitUtils;

/**
 * hbase 客戶端
 * 
 * @author kuang hj
 * 
 */
@SuppressWarnings("all")
public class HBaseClient {

    private static Logger logger = LoggerFactory.getLogger(HBaseClient.class);
    private static Configuration config;
    static {
        config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum",
                "192.168.1.100:2181,192.168.1.101:2181,192.168.1.103:2181");
    }

    /**
     * 根據隨機雜湊(hash)建立分割槽表
     * 
     * @throws Exception
     *             hash_split_table
     */
    public static void testHashAndCreateTable(String tableNameTmp,
            String columnFamily) throws Exception {<p>        // 取隨機雜湊 10 代表 10個分割槽
        HashChoreWoker worker = new HashChoreWoker(1000000, 10);
        byte[][] splitKeys = worker.calcSplitKeys();

        HBaseAdmin admin = new HBaseAdmin(config);
        TableName tableName = TableName.valueOf(tableNameTmp);

        if (admin.tableExists(tableName)) {
            try {
                admin.disableTable(tableName);
            } catch (Exception e) {
            }
            admin.deleteTable(tableName);
        }

        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
        HColumnDescriptor columnDesc = new HColumnDescriptor(
                Bytes.toBytes(columnFamily));
        columnDesc.setMaxVersions(1);
        tableDesc.addFamily(columnDesc);

        admin.createTable(tableDesc, splitKeys);

        admin.close();
    }

    /**
     * @Title: queryData
     * @Description: 從HBase查詢出資料
     * @author kuang hj
     * @param tableName
     *            表名
     * @param rowkey
     *            rowkey
     * @return 返回使用者資訊的list
     * @throws Exception
     */
    @SuppressWarnings("all")
    public static ArrayList<String> queryData(String tableName, String rowkey)
            throws Exception {
        ArrayList<String> list = new ArrayList<String>();
        logger.info("開始時間");
        HTable table = new HTable(config, tableName);

        Get get = new Get(rowkey.getBytes()); // 根據主鍵查詢
        Result r = table.get(get);
        logger.info("結束時間");
        KeyValue[] kv = r.raw();
        for (int i = 0; i < kv.length; i++) {
            // 迴圈每一列
            String key = kv[i].getKeyString();
            
            String value = kv[i].getValueArray().toString();
            
            // 將查詢到的結果寫入List中
            list.add(key + ":"+ value);
            
        }// end of 遍歷每一列
        
        return list;
    }

    /**
     * 增加表資料
     * 
     * @param tableName
     * @param rowkey
     */
    public static void insertData(String tableName, String rowkey) {
        HTable table = null;
        try {
            table = new HTable(config, tableName);
            // 一個PUT代表一行資料,再NEW一個PUT表示第二行資料,每行一個唯一的ROWKEY,此處rowkey為put構造方法中傳入的值
            for (int i = 1; i < 100; i++) {
                byte[] result = getNumRowkey(rowkey,i);
                Put put = new Put(result);
                // 本行資料的第一列
                put.add(rowkey.getBytes(), "name".getBytes(),
                        ("aaa" + i).getBytes());
                // 本行資料的第三列
                put.add(rowkey.getBytes(), "age".getBytes(),
                        ("bbb" + i).getBytes());
                // 本行資料的第三列
                put.add(rowkey.getBytes(), "address".getBytes(),
                        ("ccc" + i).getBytes());

                table.put(put);
            }

        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }

    private static byte[] getNewRowkey(String rowkey) {
        byte[] result = null;

        RowKeyGenerator rkGen = new HashRowKeyGenerator();
        byte[] splitKeys = rkGen.nextId();

        byte[] rowkeytmp = rowkey.getBytes();

        result = new byte[splitKeys.length + rowkeytmp.length];
        System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
        System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
                rowkeytmp.length);

        return result;
    }
    
    public static void main(String[] args) {
        RowKeyGenerator rkGen = new HashRowKeyGenerator();
        byte[] splitKeys = rkGen.nextId();
        System.out.println(splitKeys);    
    }

    private static byte[] getNumRowkey(String rowkey, int i) {
        byte[] result = null;

        RowKeyGenerator rkGen = new HashRowKeyGenerator();
        byte[] splitKeys = rkGen.nextId();

        byte[] rowkeytmp = rowkey.getBytes();

        byte[] intVal = BitUtils.getByteByInt(i);
        result = new byte[splitKeys.length + rowkeytmp.length + intVal.length];
        System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
        System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
                rowkeytmp.length);
        System.arraycopy(intVal, 0, result, splitKeys.length+rowkeytmp.length,
                intVal.length);

        return result;
    }
    
    

    /**
     * 刪除表
     * 
     * @param tableName
     */
    public static void dropTable(String tableName) {
        try {
            HBaseAdmin admin = new HBaseAdmin(config);
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        } catch (MasterNotRunningException e) {
            e.printStackTrace();
        } catch (ZooKeeperConnectionException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查詢所有
     * 
     * @param tableName
     */
    public static void QueryAll(String tableName) {
        HTable table  = null;
        try {
            table  = new HTable(config, tableName);
            ResultScanner rs = table.getScanner(new Scan());
            for (Result r : rs) {
                System.out.println("獲得到rowkey:" + new String(r.getRow()));
                for (KeyValue keyValue : r.raw()) {
                    System.out.println("列:" + new String(keyValue.getFamily())
                            + "====值:" + new String(keyValue.getValue()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查詢所有
     * 
     * @param tableName
     */
    public static void QueryByCondition1(String tableName) {

        HTable table = null;
        try {
            table  = new HTable(config, tableName);
            Get scan = new Get("abcdef".getBytes());// 根據rowkey查詢
            Result r = table.get(scan);
            System.out.println("獲得到rowkey:" + new String(r.getRow()));
            for (KeyValue keyValue : r.raw()) {
                System.out.println("列:" + new String(keyValue.getFamily())
                        + "====值:" + new String(keyValue.getValue()));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**
     *  根據rowkwy前墜查詢 
     * @param tableName
     * @param rowkey
     */
    public static void queryByRowKey(String tableName,String rowkey)
    {
        try {
            HTable table = new HTable(config, tableName);
            Scan scan = new Scan();
            scan.setFilter(new PrefixFilter(rowkey.getBytes()));
            ResultScanner rs = table.getScanner(scan);
            KeyValue[] kvs = null;
            for (Result tmp : rs)
            {
                kvs = tmp.raw();
                for (KeyValue kv : kvs)
                {
                    System.out.print(kv.getRow()+" ");
                    System.out.print(kv.getFamily()+" :");
                    System.out.print(kv.getQualifier()+" ");
                    System.out.print(kv.getTimestamp()+" ");
                    System.out.println(kv.getValue());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }
    /**
     * 查詢所有
     * 
     * @param tableName
     */
    public static void QueryByCondition2(String tableName) {

        try {
            HTable table = new HTable(config, tableName);
            // 當列column1的值為aaa時進行查詢
            Filter filter = new SingleColumnValueFilter(
                    Bytes.toBytes("column1"), null, CompareOp.EQUAL,
                    Bytes.toBytes("aaa")); 
            Scan s = new Scan();
            s.setFilter(filter);
            ResultScanner rs = table.getScanner(s);
            for (Result r : rs) {
                System.out.println("獲得到rowkey:" + new String(r.getRow()));
                for (KeyValue keyValue : r.raw()) {
                    System.out.println("列:" + new String(keyValue.getFamily())
                            + "====值:" + new String(keyValue.getValue()));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 查詢所有
     * 
     * @param tableName
     */
    public static void QueryByCondition3(String tableName) {

        try {
            
            HTable table = new HTable(config, tableName);

            List<Filter> filters = new ArrayList<Filter>();

            Filter filter1 = new SingleColumnValueFilter(
                    Bytes.toBytes("column1"), null, CompareOp.EQUAL,
                    Bytes.toBytes("aaa"));
            filters.add(filter1);

            Filter filter2 = new SingleColumnValueFilter(
                    Bytes.toBytes("column2"), null, CompareOp.EQUAL,
                    Bytes.toBytes("bbb"));
            filters.add(filter2);

            Filter filter3 = new SingleColumnValueFilter(
                    Bytes.toBytes("column3"), null, CompareOp.EQUAL,
                    Bytes.toBytes("ccc"));
            filters.add(filter3);

            FilterList filterList1 = new FilterList(filters);

            Scan scan = new Scan();
            scan.setFilter(filterList1);
            ResultScanner rs = table.getScanner(scan);
            for (Result r : rs) {
                System.out.println("獲得到rowkey:" + new String(r.getRow()));
                for (KeyValue keyValue : r.raw()) {
                    System.out.println("列:" + new String(keyValue.getFamily())
                            + "====值:" + new String(keyValue.getValue()));
                }
            }
            rs.close();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}</p>
HashChoreWoker:

import java.util.Iterator;
import java.util.TreeSet;

import org.apache.hadoop.hbase.util.Bytes;

/**
 * 
 * @author kuang hj
 *
 */
public class HashChoreWoker{
    // 隨機取機數目
    private int baseRecord;
    // rowkey生成器
    private RowKeyGenerator rkGen;
    // 取樣時,由取樣數目及region數相除所得的數量.
    private int splitKeysBase;
    // splitkeys個數
    private int splitKeysNumber;
    // 由抽樣計算出來的splitkeys結果
    private byte[][] splitKeys;

    public HashChoreWoker(int baseRecord, int prepareRegions) {
        this.baseRecord = baseRecord;
        // 例項化rowkey生成器
        rkGen = new HashRowKeyGenerator();
        splitKeysNumber = prepareRegions - 1;
        splitKeysBase = baseRecord / prepareRegions;
    }

    public byte[][] calcSplitKeys() {
        splitKeys = new byte[splitKeysNumber][];
        // 使用treeset儲存抽樣資料,已排序過
        TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        for (int i = 0; i < baseRecord; i++) {
            rows.add(rkGen.nextId());
        }
        int pointer = 0;
        Iterator<byte[]> rowKeyIter = rows.iterator();
        int index = 0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            if ((pointer != 0) && (pointer % splitKeysBase == 0)) {
                if (index < splitKeysNumber) {
                    splitKeys[index] = tempRow;
                    index++;
                }
            }
            pointer++;
        }
        rows.clear();
        rows = null;
        return splitKeys;
    }
}


HashRowKeyGenerator:
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;

import com.kktest.hbase.BitUtils;
/**
*
*
**/
public class HashRowKeyGenerator implements RowKeyGenerator {
    private static long currentId = 1;
    private static long currentTime = System.currentTimeMillis();
    //private static Random random = new Random();

    public byte[] nextId() 
    {
        try {
            currentTime = getRowKeyResult(Long.MAX_VALUE - currentTime);
            byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4);
            byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4);
            byte[] result = Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowT, lowU))
                    .substring(0, 8).getBytes(), Bytes.toBytes(currentId));
            return result;
        } finally {
            currentId++;
        }
    }
    
    /**
     *  getRowKeyResult
     * @param tmpData
     * @return
     */
    public static long getRowKeyResult(long tmpData)
    {
        String str = String.valueOf(tmpData);
        StringBuffer sb = new StringBuffer();
        char[] charStr = str.toCharArray();
        for (int i = charStr.length -1 ; i > 0; i--)
        {
            sb.append(charStr[i]);
        }
        
        return Long.parseLong(sb.toString());
    }
}




</pre><pre name="code" class="java">