1. 程式人生 > >java實現hbase資料庫的增刪改查操作(新API)

java實現hbase資料庫的增刪改查操作(新API)

操作環境:

   java版本:    jdk 1.7以上

   hbase 版本:1.2.x

   hadoop版本:2.6.0以上

實現功能: 1,建立指定表

       2,刪除指定表

      3,根據表名,行鍵,列族,列描述符,值插入資料

      4,根據指定表獲取指定行鍵rowkey和列族family的資料 並以字串的形式返回查詢到的結果

      5,根據table查詢表中的所有資料 無返回值,直接在控制檯列印結果

      7,根據指定表獲取指定行鍵rowKey和列族family的資料 並以Map集合的形式返回查詢到的結果

      8,根據指定表獲取指定行鍵rowKey的所有資料 並以Map集合的形式返回查詢到的結果

      9,根據表名獲取所有的資料


package com.hbase.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtil{

	/**
	 * 連線池物件
	 */
	protected static Connection connection;
	private static final String ZK_QUORUM = "hbase.zookeeper.quorum";
	private static final String ZK_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
	/**
	 * HBase位置
	 */
	private static final String HBASE_POS = "192.168.1.104";

	/**
	 * ZooKeeper位置
	 */
	private static final String ZK_POS = "localhost";

	/**
	 * zookeeper服務埠
	 */
	private final static String ZK_PORT_VALUE = "2181";

	/**
	 * 靜態構造,在呼叫靜態方法時前進行執行
	 * 初始化連線物件.
	 * */
	static{
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.rootdir", "hdfs://" + HBASE_POS
				+ ":9000/hbase");
		configuration.set(ZK_QUORUM, ZK_POS);
		configuration.set(ZK_CLIENT_PORT, ZK_PORT_VALUE);
		try {
			connection = ConnectionFactory.createConnection(configuration);
		} catch (IOException e) {
			e.printStackTrace();
		}// 建立連線池
	}
	/**
	 * 建構函式,用於初始化內建物件
	 */
	public HBaseUtil() {
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.rootdir", "hdfs://" + HBASE_POS
				+ ":9000/hbase");
		configuration.set(ZK_QUORUM, ZK_POS);
		configuration.set(ZK_CLIENT_PORT, ZK_PORT_VALUE);
		try {
			connection = ConnectionFactory.createConnection(configuration);
		} catch (IOException e) {
			e.printStackTrace();
		}// 建立連線池
	}

	/**
	 * @param tableName
	 *            建立一個表 tableName 指定的表名  seriesStr
	 * @param seriesStr
	 *            以字串的形式指定表的列族,每個列族以逗號的形式隔開,(例如:"f1,f2"兩個列族,分別為f1和f2)
	 **/
	public boolean createTable(String tableName, String seriesStr) {
		boolean isSuccess = false;// 判斷是否建立成功!初始值為false
		Admin admin = null;
		TableName table = TableName.valueOf(tableName);
		try {
			admin = connection.getAdmin();
			if (!admin.tableExists(table)) {
				System.out.println("INFO:Hbase::  " + tableName + "原資料庫中表不存在!開始建立...");
				HTableDescriptor descriptor = new HTableDescriptor(table);
				String[] series = seriesStr.split(",");
				for (String s : series) {
					descriptor.addFamily(new HColumnDescriptor(s.getBytes()));
				}
				admin.createTable(descriptor);
				System.out.println("INFO:Hbase::  "+tableName + "新的" + tableName + "表建立成功!");
				isSuccess = true;
			} else {
				System.out.println("INFO:Hbase::  該表已經存在,不需要在建立!");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(admin);
		}
		return isSuccess;
	}
	
	/**
	 * 刪除指定表名的表
	 * @param tableName  表名
	 * @throws IOException 
	 * */
	public boolean dropTable(String tableName) throws IOException {
		boolean isSuccess = false;// 判斷是否建立成功!初始值為false
		Admin admin = null;
		TableName table = TableName.valueOf(tableName);
		try {
			admin = connection.getAdmin();
			if (admin.tableExists(table)) {
				admin.disableTable(table);
				admin.deleteTable(table);
				isSuccess = true;
			}
		} finally {
			IOUtils.closeQuietly(admin);
		}
		return isSuccess;
	}
	
	/**
	 * 向指定表中插入資料
	 * 
	 * @param tableName
	 *            要插入資料的表名
	 * @param rowkey
	 *            指定要插入資料的表的行鍵
	 * @param family
	 *            指定要插入資料的表的列族family
	 * @param qualifier
	 *            要插入資料的qualifier
	 * @param value
	 *            要插入資料的值value
	 * */
	protected static  void putDataH(String tableName, String rowkey, String family,
			String qualifier, Object value) throws IOException {
		Admin admin = null;
		TableName tN = TableName.valueOf(tableName);
		admin = connection.getAdmin();
		if (admin.tableExists(tN)) {
			try (Table table = connection.getTable(TableName.valueOf(tableName
					.getBytes()))) {
				Put put = new Put(rowkey.getBytes());
				put.addColumn(family.getBytes(), qualifier.getBytes(),
						value.toString().getBytes());
				table.put(put);
			} catch (Exception e) {
				e.printStackTrace();
			}
		} else {
			System.out.println("插入資料的表不存在,請指定正確的tableName ! ");
		}
	}
	/**
	 * 根據指定表獲取指定行鍵rowkey和列族family的資料 並以字串的形式返回查詢到的結果
	 * 
	 * @param tableName
	 *            要獲取表 tableName 的表名
	 * @param rowKey
	 *            指定要獲取資料的行鍵
	 * @param family
	 *            指定要獲取資料的列族元素
	 * @param qualifier
	 *            指定要獲取資料的qualifier
	 *            
	 * */
	protected static String getValueBySeriesH(String tableName, String rowKey,
			String family,String qualifier) throws IllegalArgumentException, IOException {
		Table table = null;
		String resultStr = null;
		try {
			table = connection
					.getTable(TableName.valueOf(tableName.getBytes()));
			Get get = new Get(Bytes.toBytes(rowKey));
			if( !get.isCheckExistenceOnly()){
				get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
				Result res = table.get(get);
				byte[] result = res.getValue(Bytes.toBytes(family),
						Bytes.toBytes(qualifier));
				resultStr = Bytes.toString(result);
			}else{
				resultStr = null;
			}
		} finally {
			IOUtils.closeQuietly(table);
		}
		return resultStr;
	}
	
	/**
	 * 根據table查詢表中的所有資料 無返回值,直接在控制檯列印結果
	 * */
	@SuppressWarnings("deprecation")
	public void getValueByTable(String tableName) throws Exception {
		Table table = null;
		try {
			table = connection.getTable(TableName.valueOf(tableName));
			ResultScanner rs = table.getScanner(new Scan());
			for (Result r : rs) {
				System.out.println("獲得到rowkey:" + new String(r.getRow()));
				for (KeyValue keyValue : r.raw()) {
					System.out.println("列:" + new String(keyValue.getFamily())
							+ ":" + new String(keyValue.getQualifier())
							+ "====值:" + new String(keyValue.getValue()));
				}
			}
		} finally {
			IOUtils.closeQuietly(table);
		}
	}

	/**
	 * 根據指定表獲取指定行鍵rowKey和列族family的資料 並以Map集合的形式返回查詢到的結果
	 * 
	 * @param tableName
	 *            要獲取表 tableName 的表名
	 * @param rowKey
	 *            指定的行鍵rowKey
	 * @param family
	 *            指定列族family
	 * */
	
	protected static Map<String, String> getAllValueH(String tableName,
			String rowKey, String family) throws IllegalArgumentException, IOException {
		Table table = null;
		Map<String, String> resultMap = null;
		try {
			table = connection.getTable(TableName.valueOf(tableName));
			Get get = new Get(Bytes.toBytes(rowKey));
			if(get.isCheckExistenceOnly()){
				Result res = table.get(get);
				Map<byte[], byte[]> result = res.getFamilyMap(family.getBytes());
				Iterator<Entry<byte[], byte[]>> it = result.entrySet().iterator();
				resultMap = new HashMap<String, String>();
				while (it.hasNext()) {
					Entry<byte[], byte[]> entry = it.next();
					resultMap.put(Bytes.toString(entry.getKey()),
							Bytes.toString(entry.getValue()));
				}
			}
		} finally {
			IOUtils.closeQuietly(table);
		}
		return resultMap;
	}
	
 
	/**
	 * 根據指定表獲取指定行鍵rowKey的所有資料 並以Map集合的形式返回查詢到的結果
	 * 每條資料之間用&&&將Qualifier和Value進行區分
	 * @param tableName
	 *            要獲取表 tableName 的表名
	 * @param rowkey
	 *            指定的行鍵rowKey
	 * */
	public  ArrayList<String> getFromRowkeyValues(String tableName,  String rowkey){
		Table table =null;
		ArrayList<String> Resultlist = new ArrayList<>();
        Get get =  new  Get(Bytes. toBytes ( rowkey ));
        try {
			table = connection.getTable(TableName.valueOf(tableName));
			Result  r = table.get(get);
	         for  (Cell cell : r.rawCells()) {
	             //每條資料之間用&&&將Qualifier和Value進行區分
	        	 String reString = Bytes. toString (CellUtil. cloneQualifier (cell))+"&&&"+Bytes. toString (CellUtil. cloneValue (cell));
	        	 Resultlist.add(reString);
	        }
	        table.close();
		} catch (IOException e1) {
			e1.printStackTrace();
		}
        return Resultlist;
	}
	/**
	 * 根據表名獲取所有的資料
	 * */
	@SuppressWarnings("unused")
	private void getAllValues(String tableName){
		try {
			Table table= connection.getTable(TableName.valueOf(tableName));
			Scan scan = new Scan();
			ResultScanner resutScanner = table.getScanner(scan);
			for(Result result: resutScanner){
				System.out.println("scan:  " + result);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void getTestDate(String tableName) throws IOException{
		Table table = null;
		table = connection.getTable(TableName.valueOf(tableName));
		int count = 0;
		 Scan scan  = new Scan();
		 scan.addFamily("f".getBytes());
		 
		 Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, 
				                                                     new RegexStringComparator("112213.*"));
		 scan.setFilter(filter);
		 ResultScanner resultScanner = table.getScanner(scan);
		 for(Result result : resultScanner){
			 System.out.println(result);
			 count++;
		 }
		 System.out.println("INFO:Hbase::  測試結束!共有 " + count + "條資料");
	}
	
	
}