1. 程式人生 > >使用Java API連線和操作HBase資料庫

使用Java API連線和操作HBase資料庫

建立的資料庫儲存如下資料

這裡寫圖片描述

表結構

這裡寫圖片描述

java程式碼

public class HbaseTest {

    /**
     * 配置ss
     */
    static Configuration config = null;
    private Connection connection = null;
    private Table table = null;

    @Before
    public void init() throws Exception {
        config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "192.168.33.61");// zookeeper地址 config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper埠 connection = ConnectionFactory.createConnection(config); table = connection.getTable(TableName.valueOf("dept")); } /** * 建立資料庫表dept,並增加列族info和subdept * * @throws
Exception */
@Test public void createTable() throws Exception { // 建立表管理類 HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理 // 建立表描述類 TableName tableName = TableName.valueOf("dept"); // 表名稱 HTableDescriptor desc = new HTableDescriptor(tableName); // 建立列族的描述類
HColumnDescriptor family = new HColumnDescriptor("info"); // 列族 // 將列族新增到表中 desc.addFamily(family); HColumnDescriptor family2 = new HColumnDescriptor("subdept"); // 列族 // 將列族新增到表中 desc.addFamily(family2); // 建立表 admin.createTable(desc); // 建立表 System.out.println("建立表成功!"); } /** * 向hbase中插入前三行網路部、開發部、測試部的相關資料, * 即加入表中的前三條資料 * * @throws Exception */ @SuppressWarnings({ "deprecation", "resource" }) @Test public void insertData() throws Exception { table.setAutoFlushTo(false); table.setWriteBufferSize(534534534); ArrayList<Put> arrayList = new ArrayList<Put>(); Put put = new Put(Bytes.toBytes("0_1")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("網路部")); put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept1"), Bytes.toBytes("1_1")); put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept2"), Bytes.toBytes("1_2")); arrayList.add(put); Put put1 = new Put(Bytes.toBytes("1_1")); put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部")); put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); Put put2 = new Put(Bytes.toBytes("1_2")); put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試部")); put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); for (int i = 1; i <= 100; i++) { put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("2_"+i)); put2.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("3_"+i)); } arrayList.add(put1); arrayList.add(put2); //插入資料 table.put(arrayList); //提交 table.flushCommits(); System.out.println("資料插入成功!"); } /** * 向hbase中插入開發部、測試部下的所有子部門資料 * @throws Exception */ @Test public void insertOtherData() throws Exception { table.setAutoFlushTo(false); table.setWriteBufferSize(534534534); ArrayList<Put> arrayList = new ArrayList<Put>(); for (int i = 1; i <= 100; i++) { Put put_development = new Put(Bytes.toBytes("2_"+i)); put_development.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發"+i+"組")); put_development.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_1")); arrayList.add(put_development); Put put_test = new Put(Bytes.toBytes("3_"+i)); put_test.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試"+i+"組")); put_test.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_2")); arrayList.add(put_test); } //插入資料 table.put(arrayList); //提交 table.flushCommits(); System.out.println("插入其他資料成功!"); } /** * 查詢所有一級部門(沒有上級部門的部門) * @throws Exception */ @Test public void scanDataStep1() throws Exception { // 建立全表掃描的scan Scan scan = new Scan(); System.out.println("查詢到的所有一級部門如下:"); // 列印結果集 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("f_pid")) == null) { for (KeyValue kv : result.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " = "); System.out.print(new String(kv.getValue())); System.out.print(" timestamp = " + kv.getTimestamp() + "\n"); } } } } /** * 已知rowkey,查詢該部門的所有(直接)子部門資訊 rowkey=1_1 * @throws Exception */ @Test public void scanDataStep2() throws Exception { Get g = new Get("1_1".getBytes()); g.addFamily("subdept".getBytes()); // 列印結果集 Result result = table.get(g); for (KeyValue kv : result.raw()) { Get g1 = new Get(kv.getValue()); Result result1 = table.get(g1); for (KeyValue kv1 : result1.raw()) { System.out.print(new String(kv1.getRow()) + " "); System.out.print(new String(kv1.getFamily()) + ":"); System.out.print(new String(kv1.getQualifier()) + " = "); System.out.print(new String(kv1.getValue())); System.out.print(" timestamp = " + kv1.getTimestamp() + "\n"); } } } /** * 已知rowkey,向該部門增加一個子部門 * rowkey:0_1 * 增加的部門名:我增加的部門 * @throws Exception */ @Test public void scanDataStep3() throws Exception { //新增一個部門 Put put = new Put(Bytes.toBytes("4_1")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("我增加的部門")); put.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1")); //插入資料 table.put(put); //提交 table.flushCommits(); //更新網路部 Put put1 = new Put(Bytes.toBytes("0_1")); put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept3"), Bytes.toBytes("4_1")); //插入資料 table.put(put1); //提交 table.flushCommits(); } /** * 已知rowkey(且該部門存在子部門),刪除該部門資訊,該部門所有(直接)子部門被調整到其他部門中 * @throws Exception */ @Test public void scanDataStep4() throws Exception { /** * 向部門"我增加的部門"新增兩個子部門" */ table.setAutoFlushTo(false); table.setWriteBufferSize(534534534); ArrayList<Put> arrayList = new ArrayList<Put>(); Put put1 = new Put(Bytes.toBytes("5_1")); put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門1")); put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1")); Put put2 = new Put(Bytes.toBytes("5_2")); put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門2")); put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1")); arrayList.add(put1); arrayList.add(put2); //插入資料 table.put(arrayList); //提交 table.flushCommits(); /** * 目的:刪除"我增加的部門"的部門資訊,該部門所有(直接)子部門被調整到其他部門中 * 使用策略:更新部門名就可以了,也就是說一個部門可能有多個rowkey */ Put put = new Put(Bytes.toBytes("4_1")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部")); //插入資料 table.put(put); //提交 table.flushCommits(); } @After public void close() throws Exception { table.close(); connection.close(); } }