1. 程式人生 > >[Hbase]HBase架構詳解和資料的讀寫流程

[Hbase]HBase架構詳解和資料的讀寫流程

HBase架構圖理解

18.png

  1. HMaster連結Zookeeper的目得:HMaster需要知道哪些HRegionServere是活的及HRegionServer所在的位置,然後管理HRegionServer。
  2. HBase內部是通過DFS client把資料寫到HDFS上的
  3. 每一個HRegionServer有多個HRegion,每一個HRegion有多個Store,每一個Store對應一個列簇。
  4. HFile是HBase中KeyValue資料的儲存格式,HFile是Hadoop的二進位制格式檔案,StoreFile就是對HFile進行了封裝,然後進行資料的儲存。
  5. HStore由MemStore和StoreFile組成。
  6. HLog記錄資料的所有變更,可以用來做資料恢復。
  7. hdfs對應的目錄結構為 namespace->table->列簇->列->單元格

     

    17.png

寫資料流程

  1. zookeeper中儲存了meta表的region資訊,從meta表獲取相應region資訊,然後找到meta表的資料
  2. 根據namespace、表名和rowkey根據meta表的資料找到寫入資料對應的region資訊
  3. 找到對應的regionserver
  4. 把資料分別寫到HLog和MemStore上一份
  5. MemStore達到一個閾值後則把資料刷成一個StoreFile檔案。若MemStore中的資料有丟失,則可以總HLog上恢復
  6. 當多個StoreFile檔案達到一定的大小後,會觸發Compact合併操作,合併為一個StoreFile,這裡同時進行版本的合併和資料刪除。
  7. 當Compact後,逐步形成越來越大的StoreFIle後,會觸發Split操作,把當前的StoreFile分成兩個,這裡相當於把一個大的region分割成兩個region。如下圖:

     

    19.png

讀資料流程

  1. zookeeper中儲存了meta表的region資訊,所以先從zookeeper中找到meta表region的位置,然後讀取meta表中的資料。meta中又儲存了使用者表的region資訊。
  2. 根據namespace、表名和rowkey在meta表中找到對應的region資訊
  3. 找到這個region對應的regionserver
  4. 查詢對應的region
  5. 先從MemStore找資料,如果沒有,再到StoreFile上讀(為了讀取的效率)。

HBase Java API基本使用

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseClientTest {
    
    /*
     * 跟去表名獲取表的例項
     */
    public static HTable getTable (String name) throws Exception{
        //get the hbase conf instance
        Configuration conf = HBaseConfiguration.create();
        //get the hbase table instance
        HTable table = new HTable(conf, name);
        
        return table;
    }
    
    /**
     * get the data from the hbase table 
     * 
     * get 'tbname','rowkey','cf:col'
     * 
     * 列簇-》列名-》value-》timestamp
     */
    public static void getData(HTable table) throws Exception {
        // TODO Auto-generated method stub
        Get get = new Get(Bytes.toBytes("20161119_10003"));
        //conf the get 
        //get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
        get.addFamily(Bytes.toBytes("info"));
        //load the get 
        Result rs = table.get(get);
        //print the data
        for(Cell cell : rs.rawCells()){
            System.out.println(
                    Bytes.toString(CellUtil.cloneFamily(cell))
                    +"->"+
                    Bytes.toString(CellUtil.cloneQualifier(cell))
                    +"->"+
                    Bytes.toString(CellUtil.cloneValue(cell))
                    +"->"+
                    cell.getTimestamp()
                    );
            System.out.println("------------------------------");
        }
        
    }
    
    /**
     * put the data to the hbase table 
     * 
     * put 'tbname','rowkey','cf:col','value'
     *      
     */
    public static void putData(HTable table) throws Exception {
        //get the put instance
        Put put = new Put(Bytes.toBytes("20161119_10003"));
        //conf the put
        put.add(
                Bytes.toBytes("info"), 
                Bytes.toBytes("age"), 
                Bytes.toBytes("20")
                );
        //load the put 
        table.put(put);
        //print
        getData(table);
    }
    
    /**
     * delete the data from the hbase table 
     * 
     * delete 'tbname','rowkey','cf:col'
     *      
     */
    public static void deleteData(HTable table) throws Exception {
        //get the delete instance
        Delete del = new Delete(Bytes.toBytes("20161119_10003"));
        //conf the del
        //del.deleteColumn(Bytes.toBytes("info"),Bytes.toBytes("age"));
        del.deleteColumns(Bytes.toBytes("info"),Bytes.toBytes("age"));
        //load the del
        table.delete(del);
        //print
        getData(table);
    }
    
    /**
     * scan the all table
     * scan 'tbname'
     *      
     */
    public static void scanData(HTable table) throws Exception {
        //get the scan instance
        Scan scan = new Scan();
        //load the scan
        ResultScanner rsscan = table.getScanner(scan);
        for(Result rs : rsscan){
            System.out.println(Bytes.toString(rs.getRow()));
            for(Cell cell : rs.rawCells()){
                System.out.println(
                        Bytes.toString(CellUtil.cloneFamily(cell))
                        +"->"+
                        Bytes.toString(CellUtil.cloneQualifier(cell))
                        +"->"+
                        Bytes.toString(CellUtil.cloneValue(cell))
                        +"->"+
                        cell.getTimestamp()
                        );
            }
            System.out.println("------------------------------");
        }
    }
    
    /**
     * scan the table  with limit
     * 
     * scan 'tbname',{STARTROW => 'row1',STOPROW => 'row2'}
     */
    public static void rangeData(HTable table) throws Exception {
        //get the scan instance
        Scan scan = new Scan();
        //conf the scan
            //scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
            //scan.addFamily(family);
            //scan.setStartRow(Bytes.toBytes("20161119_10002"));
            //scan.setStopRow(Bytes.toBytes("20161119_10003"));
        Filter filter = new PrefixFilter(Bytes.toBytes("2016111"));
        scan.setFilter(filter);
        //hbase conf
        //是否啟動快取
        scan.setCacheBlocks(true);
        //設定快取的條數
        scan.setCaching(100);
        //每一次取多少條
        scan.setBatch(10);
        //共同決定了請求RPC的次數
        
        //load the scan
        ResultScanner rsscan = table.getScanner(scan);
        for(Result rs : rsscan){
            System.out.println(Bytes.toString(rs.getRow()));
            for(Cell cell : rs.rawCells()){
                System.out.println(
                        Bytes.toString(CellUtil.cloneFamily(cell))
                        +"->"+
                        Bytes.toString(CellUtil.cloneQualifier(cell))
                        +"->"+
                        Bytes.toString(CellUtil.cloneValue(cell))
                        +"->"+
                        cell.getTimestamp()
                        );
            }
            System.out.println("------------------------------");
        }
    }
    
    public static void main(String[] args) throws Exception {
        HTable table = getTable("test:tb1");
        getData(table);
        putData(table);
        deleteData(table);
        scanData(table);
        rangeData(table);
    }   
}

HBase架構中各個模組的功能再次總結

  1. ** Client ** 整個HBase叢集的訪問入口; 使用HBase RPC機制與HMaster和HRegionServer進行通訊; 與HMaster進行通訊進行管理表的操作; 與HRegionServer進行資料讀寫類操作; 包含訪問HBase的介面,並維護cache來加快對HBase的訪問
  2. ** Zookeeper ** 保證任何時候,叢集中只有一個HMaster; 存貯所有HRegion的定址入口; 實時監控HRegion Server的上線和下線資訊,並實時通知給HMaster; 儲存HBase的schema和table元資料; Zookeeper Quorum儲存表地址、HMaster地址。
  3. ** HMaster ** HMaster沒有單點問題,HBase中可以啟動多個HMaster,通過Zookeeper的Master Election機制保證總有一個Master在執行,主負責Table和Region的管理工作。 管理使用者對錶的建立、刪除等操作; 管理HRegionServer的負載均衡,調整Region分佈; Region Split後,負責新Region的分佈; 在HRegionServer停機後,負責失效HRegionServer上Region遷移工作。
  4. ** HRegion Server ** 維護HRegion,處理對這些HRegion的IO請求,向HDFS檔案系統中讀寫資料; 負責切分在執行過程中變得過大的HRegion。 Client訪問hbase上資料的過程並不需要master參與(定址訪問Zookeeper和HRegion Server,資料讀寫訪問HRegione Server),HMaster僅僅維護這table和Region的元資料資訊,負載很低。

hbase與mapreduce的整合

可以把hbase表中的資料作為mapreduce計算框架的輸入,或者把mapreduce的計算結果輸出到hbase表中。 我們以hbase中自帶的mapreduce程式舉例

  1. 直接執行會發現報錯缺少jar包,所以執行前需引入環境變數
$ export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2 
$ export HADOOP_HOME=/opt/modules/hadoop-2.5.0  
# $HBASE_HOME/bin/hbase mapredcp可以列出hbase在yarn上執行所需的jar包
$ export HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp`
  1. 執行示例
$ $HADOOP_HOME/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar rowcounter  test:tb1

HBase的資料遷移的importsv的使用

HBase資料來源於日誌檔案或者RDBMS,把資料遷移到HBase表中。常見的有三種方法:(1)使用HBase Put API;(2)使用HBase批量載入工具;(3)自定義MapReduce job實現。 importtsv是HBase官方提供的基於mapreduce的批量資料匯入工具,同時也是hbase提供的一個命令列工具,可以將儲存在HDFS上的自定義分隔符(預設是\t)的資料檔案,通過一條命令方便的匯入到HBase中。 ** 測試 **

  1. 準備資料檔案
[[email protected] datas]$ cat tb1.tsv 
10001   zhangsan        20
10002   lisi    22
10003   wangwu  30
  1. 把資料檔案上傳到hdsf上
$ bin/hdfs dfs -put /opt/datas/tb1.tsv /
  1. 在hbase中建立表> create 'student','info'
  2. 將HDFS中的資料匯入到hbase表中
$HADOOP_HOME/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar importtsv  -Dimporttsv.separator=\t -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age  student  /tb1.tsv

Dimporttsv.columns為指定分隔符 Dimporttsv.columns指定資料檔案中每一列如何對應表中的rowkey和列 /tb1.tsv為hdfs上的資料檔案的路徑

  1. 檢視執行結果
hbase(main):010:0> scan 'student'
ROW                       COLUMN+CELL                                                              
 10001                    column=info:age, timestamp=1480123167099, value=20                       
 10001                    column=info:name, timestamp=1480123167099, value=zhangsan                
 10002                    column=info:age, timestamp=1480123167099, value=22                       
 10002                    column=info:name, timestamp=1480123167099, value=lisi                    
2 row(s) in 0.8210 seconds