1. 程式人生 > >hadoop生態系統學習之路(五)hbase的簡單使用

hadoop生態系統學習之路(五)hbase的簡單使用

最近,參與了公司的一個大資料介面平臺的開發,具體的處理過程是這樣的。我們公司負責資料的入庫,也就是一個etl過程,使用MR將資料入到hive裡面,然後同步到impala,然後此介面平臺提供查詢介面,前臺會將sql語句以引數傳過來,然後介面平臺通過呼叫impala提供的java api介面,將資料查詢出來返回給使用者。另外,如果查詢的資料量很大,那麼前臺就會傳一個taskId過來,第一次只需將資料查詢出來,入到impala臨時表,下次再查便將資料返回。那麼,如何記錄此任務的狀態變化呢,這裡我們就使用到了hbase,以taskId為row key,然後建立一個列簇記錄狀態資訊。
下面,分以下幾步對hbase進行介紹。

一、hbase的基本原理

HBase是一個構建在HDFS上的分散式列儲存系統,主要用於海量結構化資料儲存。
hbase的特點:
1. 大,一個表可以有數十億行,上百萬列;
2. 無模式,每行都有一個可排序的主鍵和任意多的列,列可以根據需要動態的增加,同一張表中不同的行可以有截然不同的列;
3. 面向列,面向列(族)的儲存和許可權控制,列(族)獨立檢索;
4. 稀疏,空(null)列並不佔用儲存空間,表可以設計的非常稀疏;
5. 資料多版本,每個單元中的資料可以有多個版本,預設情況下版本號自動分配,是單元格插入時的時間戳;
6. 資料型別單一,Hbase中的資料都是字串,沒有型別。

下面,再來看看hbase相關的元件:
這裡寫圖片描述
Master:為Region server分配region,負責Region server的負載均衡,發現失效的Region server並重新分配其上的region,管理使用者對table的增刪改查操作。
RegionServer:Regionserver維護region,處理對這些region的IO請求,Regionserver負責切分在執行過程中變得過大的region。
Zookeeper:通過選舉,保證任何時候,叢集中只有一個master,Master與RegionServers 啟動時會向ZooKeeper註冊,存貯所有Region的定址入口,實時監控Region server的上線和下線資訊,並實時通知給Master,儲存HBase的schema和table元資料,預設情況下,HBase 管理ZooKeeper 例項,比如, 啟動或者停止ZooKeeper。Zookeeper的引入使得Master不再是單點故障。
大概的介紹下,關於hbase表結構,筆者下面再進行介紹。

二、hbase的常用命令

首先,我們可以執行hbase shell進入hbase命令列,如下:
這裡寫圖片描述
然後,執行list,可以看到所有的表,如下:
這裡寫圖片描述,接下來,我們可以describe ‘表名’來查看錶結構,如下:
這裡寫圖片描述
可以看到,這個表有一個列族info。
然後,我們可以使用scan ‘表名’來檢視,整張表的資料。
下面,我們使用get ‘result_info’,’test02’獲取表中某個row key的所有列值,如下:
這裡寫圖片描述
好了,就說這幾個命令,還有很多,大家可以查閱下,多練練就熟了。

三、hbase 的java api基本操作

hbase包依賴,如下:

    <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <hadoop.version>2.3.0-cdh5.0.0</hadoop.version>
      <hbase.version>0.96.1.1-cdh5.0.0</hbase.version>
      <hive.version>0.12.0-cdh5.0.0</hive.version>
    </properties>

        <!-- habase 相關jar-->
        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>${hbase.version}</version>
          <exclusions>
              <exclusion>
                  <artifactId>jdk.tools</artifactId>
                  <groupId>jdk.tools</groupId>
              </exclusion>
          </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-thrift</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-testing-util</artifactId>
            <version>${hbase.version}</version>
            <scope>test</scope>
        </dependency>

首先,我直接貼出程式碼,如下:

package org.hbase.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * 關鍵點1_:將自動提交關閉,如果不關閉,每寫一條資料都會進行提交,是匯入資料較慢的做主要因素。
 * 關鍵點2:設定快取大小,當快取大於設定值時,hbase會自動提交。此處可自己嘗試大小,一般對大資料量,設定為5M即可,本文設定為3M。
 * 關鍵點3:每一個分片結束後都進行flushCommits(),如果不執行,當hbase最後快取小於上面設定值時,不會進行提交,導致資料丟失。
 * 
 * @author qiyongkang
 *
 */
public class Example {

    /**
     * 
     * insertBatch: 批量插入. <br/>
     *
     * @author qiyongkang
     * @throws IOException
     * @since JDK 1.6
     */
    public static void insertBatch() throws IOException {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");

        HTable htable = new HTable(config, "qyk_info");
        htable.setAutoFlush(false, false); // 關鍵點1
        htable.setWriteBufferSize(3 * 1024 * 1024); // 關鍵點2

        int num = 1;
        while (num <= 10) {
            Put put = new Put(Bytes.toBytes(num + ""));
            put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
            put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("qyk" + num));
            put.add(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes(num + ""));
            htable.put(put);

            num++;

            if (num % 100 == 0) {
                System.out.println("..." + num);
            }
        }
        htable.flushCommits();// 關鍵點3
        htable.close();
    }

    /**
     * 
     * insertSingle:單個插入. <br/>
     *
     * @author qiyongkang
     * @throws IOException
     * @since JDK 1.6
     */
    public static void insertSingle() throws IOException {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");

        HTable htable = new HTable(config, "qyk_info");
        Put put = new Put(Bytes.toBytes("0"));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("qyk" + 0));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("id"), Bytes.toBytes("0"));
        htable.put(put);

        htable.close();
    }

    /**
     * 
     * getData:根據row key獲取列資訊. <br/>
     *
     * @author qiyongkang
     * @throws IOException
     * @since JDK 1.6
     */
    public static void getData() throws IOException {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "172.31.25.8,172.31.25.2,172.31.25.3");

        HTable htable = new HTable(config, "qyk_info");

        Get get = new Get(Bytes.toBytes("1"));
        Result result = htable.get(get);

        String age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")));
        String name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
        String id = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("id")));

        System.out.println("age:" + age + ",name:" + name + ",id:" + id);

        htable.close();
    }

    public static void main(String[] args) throws IOException {
        //單個插入
        insertSingle();

        //批量插入
        insertBatch();

        //根據row key獲取資料
        getData();
    }

}

分別對應三個操作,首先我們在hbase命令列執行create ‘qyk_info’, ‘info’建立表和列族,然後,再執行程式,可以看到控制檯如下:
這裡寫圖片描述
然後,我們執行scan ‘qyk_info’可以看到,如下:
這裡寫圖片描述
然後,我們使用單個插入,rowkey還是0,將id改為11,age改為19,執行單個插入。
然後,在命令列執行get ‘qyk_info’, ‘0’可以看到:
這裡寫圖片描述
其實,這個就是更新操作,cell中的值會有一個時間戳,每次顯示此列的最新值。
好了,關於hbase的基本使用就講到這兒了,比較粗淺,希望給大家帶來幫助!