1. 程式人生 > >hbase設計以及調優

hbase設計以及調優

1、表的設計

1.1、Column Family

由於Hbase是一個面向列族的儲存器,調優和儲存都是在列族這個層次上進行的,最好使列族成員都有相同的"訪問模式(access pattern)"和大小特徵
在一張表裡不要定義太多的column family。目前Hbase並不能很好的處理超過2~3個column family的表。因為某個column family在flush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終導致系統產生更多的I/O。

1.2、Row Key

Row Key 設計原則:
1)Rowkey長度原則,Rowkey是一個二進位制碼流,可以是任意字串,最大長度64KB,實際應用中一般為10~100bytes

,存為byte[]位元組陣列,一般設計成定長的建議是越短越好,不要超過16個位元組。原因一資料的持久化檔案HFile中是按照KeyValue儲存的,如果Rowkey過長比如100個位元組,1000萬列資料光Rowkey就要佔用100*1000萬=10億個位元組,將近1G資料,這會極大影響HFile的儲存效率;原因二MemStore將快取部分資料到記憶體,如果Rowkey欄位過長記憶體的有效利用率會降低,系統將無法快取更多的資料,這會降低檢索效率。因此Rowkey的位元組長度越短越好。原因三目前作業系統是都是64位系統,記憶體8位元組對齊。控制在16個位元組,8位元組的整數倍利用作業系統的最佳特性。
2)是Rowkey雜湊原則,如果Rowkey是按時間戳的方式遞增,不要將時間放在二進位制碼的前面
,建議將Rowkey的高位作為雜湊欄位,由程式迴圈生成,低位放時間欄位,這樣將提高資料均衡分佈在每個Regionserver實現負載均衡的機率。如果沒有雜湊欄位,首欄位直接是時間資訊將產生所有新資料都在一個RegionServer上堆積的熱點現象,這樣在做資料檢索的時候負載將會集中在個別RegionServer,降低查詢效率。
3)Rowkey唯一原則,必須在設計上保證其唯一性
row key是按照字典序儲存,因此,設計row key時,要充分利用這個排序特點,將經常一起讀取的資料儲存到一塊,將最近可能會被訪問的資料放在一塊。
舉個例子:如果最近寫入HBase表中的資料是最可能被訪問的,可以考慮將時間戳作為row key的一部分,由於是字典序排序,所以可以使用Long.MAX_VALUE – timestamp作為row key,這樣能保證新寫入的資料在讀取時可以被快速命中。

1.3、 In Memory

建立表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到RegionServer的快取中,保證在讀取的時候被cache命中。

1.4 、Max Version

建立表的時候,可以通過HColumnDescriptor.setMaxVersions(intmaxVersions)設定表中資料的最大版本,如果只需要儲存最新版本的資料,那麼可以設定setMaxVersions(1)。

1.5、 Time to Live(設定資料儲存的生命週期)

建立表的時候,可以通過HColumnDescriptor.setTimeToLive(inttimeToLive)設定表中資料的儲存生命期,過期資料將自動被刪除,例如如果只需要儲存最近兩天的資料,那麼可以設定setTimeToLive(2 * 24 * 60 * 60)。

1.6、 Compact & Split

在HBase中,資料在更新時首先寫入WAL 日誌(HLog)和記憶體(MemStore)中,MemStore中的資料是排序的,當MemStore累計到一定閾值時,就會建立一個新的MemStore,並且將老的MemStore新增到flush佇列,由單獨的執行緒flush到磁碟上,成為一個StoreFile。於此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻之前的變更已經持久化了(minor compact)。
StoreFile是隻讀的,一旦建立後就不可以再修改。因此Hbase的更新其實是不斷追加的操作。當一個Store中的StoreFile達到一定的閾值後,就會進行一次合併(major compact),將對同一個key的修改合併到一起,形成一個大的StoreFile,當StoreFile的大小達到一定閾值後,又會對 StoreFile進行分割(split),等分為兩個StoreFile。
由於對錶的更新是不斷追加的,處理讀請求時,需要訪問Store中全部的StoreFile和MemStore,將它們按照row key進行合併,由於StoreFile和MemStore都是經過排序的,並且StoreFile帶有記憶體中索引,通常合併過程還是比較快的。
實際應用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合併形成一個大的StoreFile。同時,可以將StoreFile設定大些,減少split的發生

1.7、 Pre-Creating Regions

預設情況下,在建立HBase表的時候會自動建立一個region分割槽,當匯入資料的時候,所有的HBase客戶端都向這一個region寫資料,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先建立一些空的regions,這樣當資料寫入HBase時,會按照region分割槽情況,在叢集內做資料的負載均衡。                             

  1. publicstatic booleancreateTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)  
  2. throws IOException {  
  3.   try {  
  4.     admin.createTable(table, splits);  
  5.     returntrue;  
  6.   } catch (TableExistsException e) {  
  7.     logger.info("table " +table.getNameAsString() + " already exists");  
  8.     // the table already exists...
  9.     returnfalse;  
  10.   }  
  11. }  
  12. publicstaticbyte[][]getHexSplits(String startKey, String endKey, int numRegions) {  
  13.   byte[][] splits = newbyte[numRegions-1][];  
  14.   BigInteger lowestKey = newBigInteger(startKey, 16);  
  15.   BigInteger highestKey = newBigInteger(endKey, 16);  
  16.   BigInteger range =highestKey.subtract(lowestKey);  
  17.   BigInteger regionIncrement =range.divide(BigInteger.valueOf(numRegions));  
  18.   lowestKey = lowestKey.add(regionIncrement);  
  19.   for(int i=0; i < numRegions-1;i++) {  
  20.     BigInteger key =lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));  
  21.     byte[] b = String.format("%016x",key).getBytes();  
  22.     splits[i] = b;  
  23.   }  
  24.   return splits;  
  25. }  

2、寫表操作

2.1 多HTable併發寫

建立多個HTable客戶端用於寫操作,提高寫資料的吞吐量,一個例子:

  1. staticfinal Configurationconf = HBaseConfiguration.create();  
  2. staticfinal Stringtable_log_name = “user_log”;  
  3. wTableLog = newHTable[tableN];  
  4. for (int i = 0; i <tableN; i++) {  
  5.     wTableLog[i] = new HTable(conf,table_log_name);  
  6.     wTableLog[i].setWriteBufferSize(5 * 1024 *1024); //5MB
  7.     wTableLog[i].setAutoFlush(false);  
  8. }  

2.2 HTable引數設定

2.2.1 Auto Flush

通過呼叫HTable.setAutoFlush(false)方法可以將HTable寫客戶端的自動flush關閉,這樣可以批量寫入資料到 HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫快取時,才實際向HBase服務端發起寫請求。預設情況下auto flush是開啟的。保證最後手動HTable.flushCommits()或HTable.close()

2.2.2 Write Buffer

通過呼叫HTable.setWriteBufferSize(writeBufferSize)方法可以設定 HTable客戶端的寫buffer大小,如果新設定的buffer小於當前寫buffer中的資料時,buffer將會被flush到服務端。其 中,writeBufferSize的單位是byte位元組數,可以根據實際寫入資料量的多少來設定該值。

2.2.3 WAL Flag

在HBae中,客戶端向叢集中的RegionServer提交資料時(Put/Delete操作),首先會先寫WAL(Write Ahead Log)日誌(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當WAL日誌寫成功後,再接著寫 MemStore,然後客戶端被通知提交資料成功;如果寫WAL日誌失敗,客戶端則被通知提交失敗。這樣做的好處是可以做到RegionServer宕機 後的資料恢復。

因此,對於相對不太重要的資料,可以在Put/Delete操作時,通過呼叫Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函式,放棄寫WAL日誌,從而提高資料寫入的效能。

值得注意的是:謹慎選擇關閉WAL日誌,因為這樣的話,一旦RegionServer宕機,Put/Delete的資料將會無法根據WAL日誌進行恢復。

2.3 批量寫

通過呼叫HTable.put(Put)方法可以將一個指定的row key記錄寫入HBase,同樣HBase提供了另一個方法:通過呼叫HTable.put(List<Put>)方法可以將指定的row key列表,批量寫入多行記錄,這樣做的好處是批量執行,只需要一次網路I/O開銷,這對於對資料實時性要求高,網路傳輸RTT高的情景下可能帶來明顯的效能提升。

2.4 多執行緒併發寫

在客戶端開啟多個HTable寫執行緒,每個寫執行緒負責一個HTable物件的flush操作,這樣結合定時flush和寫 buffer(writeBufferSize),可以既保證在資料量小的時候,資料可以在較短時間內被flush(如1秒內),同時又保證在資料量大的 時候,寫buffer一滿就及時進行flush。下面給個具體的例子:

  1. for (int i = 0; i <threadN; i++) {  
  2.     Thread th = new Thread() {  
  3.         publicvoid run() {  
  4.             while (true) {  
  5.                 try {  
  6.                     sleep(1000); //1 second
  7.                 } catch (InterruptedExceptione) {  
  8.                     e.printStackTrace();  
  9.                 }  
  10. synchronized (wTableLog[i]) {  
  11.                     try {  
  12.                         wTableLog[i].flushCommits();  
  13.                     } catch (IOException e) {  
  14.                         e.printStackTrace();  
  15.                     }  
  16.                 }  
  17.             }  
  18. }  
  19.     };  
  20.     th.setDaemon(true);  
  21.     th.start();  
  22. }  

3、讀表操作

3.1 多HTable併發讀

建立多個HTable客戶端用於讀操作,提高讀資料的吞吐量,一個例子:

  1. staticfinal Configurationconf = HBaseConfiguration.create();  
  2. staticfinal Stringtable_log_name = “user_log”;  
  3. rTableLog = newHTable[tableN];  
  4. for (int i = 0; i <tableN; i++) {  
  5.     rTableLog[i] = new HTable(conf, table_log_name);  
  6.     rTableLog[i].setScannerCaching(50);  
  7. }  

3.2 HTable引數設定

3.2.1 Scanner Caching

hbase.client.scanner.caching配置項可以設定HBase scanner一次從服務端抓取的資料條數,預設情況下一次一條。通過將其設定成一個合理的值,可以減少scan過程中next()的時間開銷,代價是 scanner需要通過客戶端的記憶體來維持這些被cache的行記錄。

有三個地方可以進行配置:1)在HBase的conf配置檔案中進行配置;2)通過呼叫HTable.setScannerCaching(int scannerCaching)進行配置;3)通過呼叫Scan.setCaching(int caching)進行配置。三者的優先順序越來越高。

3.2.2 Scan AttributeSelection

scan時指定需要的Column Family,可以減少網路傳輸資料量,否則預設scan操作會返回整行所有Column Family的資料。

3.2.3 Close ResultScanner

通過scan取完資料後,記得要關閉ResultScanner,否則RegionServer可能會出現問題(對應的Server資源無法釋放)。

3.3 批量讀

通過呼叫HTable.get(Get)方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過呼叫HTable.get(List<Get>)方法可以根據一個指定的rowkey列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網路I/O開銷,這對於對資料實時性要求高而且網路傳輸RTT高的情景下可能帶來明顯 的效能提升。

3.4 多執行緒併發讀

在客戶端開啟多個HTable讀執行緒,每個讀執行緒負責通過HTable物件進行get操作。下面是一個多執行緒併發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:

  1. publicclass DataReaderServer{  
  2.      //獲取店鋪一天內各分鐘PV值的入口函式
  3.      publicstatic ConcurrentHashMap<String,String> getUnitMinutePV(long uid, long startStamp, long endStamp){  
  4.          long min = startStamp;  
  5.          int count = (int)((endStamp -startStamp) / (60*1000));  
  6.          List<String> lst = newArrayList<String>();  
  7.          for (int i = 0; i <= count; i++) {  
  8.             min = startStamp + i * 60 * 1000;  
  9.             lst.add(uid + "_" + min);  
  10.          }  
  11.          return parallelBatchMinutePV(lst);  
  12.      }  
  13.       //多執行緒併發查詢,獲取分鐘PV值
  14. private staticConcurrentHashMap<String, String>parallelBatchMinutePV(List<String> lstKeys){  
  15.         ConcurrentHashMap<String, String>hashRet = new ConcurrentHashMap<String, String>();  
  16.         int parallel = 3;  
  17.         List<List<String>>lstBatchKeys  = null;  
  18.         if (lstKeys.size() < parallel ){  
  19.             lstBatchKeys  = new ArrayList<List<String>>(1);  
  20.             lstBatchKeys.add(lstKeys);  
  21.         }  
  22.         else{  
  23.             lstBatchKeys  = newArrayList<List<String>>(parallel);  
  24.             for(int i = 0; i < parallel;i++  ){  
  25.                 List<String> lst = newArrayList<String>();  
  26.                 lstBatchKeys.add(lst);  
  27.             }  
  28.             for(int i = 0 ; i <lstKeys.size() ; i ++ ){  
  29.                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));  
  30.             }  
  31.         }  
  32.         List<Future<ConcurrentHashMap<String, String> >> futures = newArrayList<Future< ConcurrentHashMap<String, String> >>(5);  
  33.         ThreadFactoryBuilder builder = newThreadFactoryBuilder();  
  34.        builder.setNameFormat("ParallelBatchQuery");  
  35.         ThreadFactory factory =builder.build();  
  36.         ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(),factory);  
  37.         for(List<String> keys :lstBatchKeys){  
  38.             Callable<ConcurrentHashMap<String, String> > callable = newBatchMinutePVCallable(keys);  
  39.             FutureTask<ConcurrentHashMap<String, String> > future = (FutureTask<ConcurrentHashMap<String, String> >) executor.submit(callable);  
  40.             futures.add(future);  
  41.         }  
  42.         executor.shutdown();  
  43.         // Wait for all the tasks to finish
  44.         try {  
  45.           boolean stillRunning = !executor.awaitTermination(  
  46.               5000000, TimeUnit.MILLISECONDS);  
  47.           if (stillRunning) {  
  48.             try {  
  49.                 executor.shutdownNow();  
  50.             } catch (Exception e) {  
  51.                 // TODO Auto-generated catchblock
  52.                 e.printStackTrace();  
  53.             }  
  54.           }  
  55.         } catch (InterruptedException e) {  
  56.           try {  
  57.              Thread.currentThread().interrupt();  
  58.           } catch (Exception e1) {  
  59.             // TODO Auto-generated catch block
  60.             e1.printStackTrace();  
  61.           }  
  62.         }  
  63.         // Look for any exception
  64.         for (Future f : futures) {  
  65.           try {  
  66.               if(f.get() != null)  
  67.               {  
  68.                  hashRet.putAll((ConcurrentHashMap<String, String>)f.get());  
  69.               }  
  70.           } catch (InterruptedException e) {  
  71.             try {  
  72.                 Thread.currentThread().interrupt();  
  73.             } catch (Exception e1) {  
  74.                 // TODO Auto-generated catchblock
  75.                 e1.printStackTrace();  
  76.             }  
  77.           } catch (ExecutionException e) {  
  78.             e.printStackTrace();  
  79.           }  
  80.         }  
  81.         return hashRet;  
  82.     }  
  83.      //一個執行緒批量查詢,獲取分鐘PV值
  84.     protected staticConcurrentHashMap<String, String> getBatchMinutePV(List<String>lstKeys){  
  85.         ConcurrentHashMap<String, String>hashRet = null;  
  86.         List<Get> lstGet = newArrayList<Get>();  
  87.         String[] splitValue = null;  
  88.         for (String s : lstKeys) {  
  89.             splitValue =s.split("_");  
  90.             long uid =Long.parseLong(splitValue[0]);  
  91.             long min =Long.parseLong(splitValue[1]);  
  92.             byte[] key = newbyte[16];  
  93.             Bytes.putLong(key, 0, uid);  
  94.             Bytes.putLong(key, 8, min);  
  95.             Get g = new Get(key);  
  96.             g.addFamily(fp);  
  97.             lstGet.add(g);  
  98.         }  
  99.         Result[] res = null;  
  100.         try {  
  101.             res =tableMinutePV[rand.nextInt(tableN)].get(lstGet);  
  102.         } catch (IOException e1) {  
  103.             logger.error("tableMinutePV exception,e=" + e1.getStackTrace());  
  104.         }  
  105.         if (res != null && res.length> 0) {  
  106.             hashRet = newConcurrentHashMap<String, String>(res.length);  
  107.             for (Result re : res) {  
  108.                 if (re != null &&!re.isEmpty()) {  
  109.                     try {  
  110.                         byte[] key =re.getRow();  
  111.                         byte[] value =re.getValue(fp, cp);  
  112.                         if (key != null&& value != null) {  
  113.                            hashRet.put(String.valueOf(Bytes.toLong(key,  
  114.                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes  
  115.                                    .toLong(value)));  
  116.                         }  
  117.                     } catch (Exception e2) {  
  118.                        logger.error(e2.getStackTrace());  
  119.                     }  
  120.                 }  
  121.             }  
  122.         }  
  123.         return hashRet;  
  124.     }  
  125. }  
  126. //呼叫介面類,實現Callable介面
  127. class BatchMinutePVCallableimplements Callable<ConcurrentHashMap<String, String>>{  
  128.      private List<String> keys;  
  129.      publicBatchMinutePVCallable(List<String> lstKeys ) {  
  130.          this.keys = lstKeys;  
  131.      }  
  132.      public ConcurrentHashMap<String,String> call() throws Exception {  
  133.          returnDataReadServer.getBatchMinutePV(keys);  
  134.      }  
  135. }  

3.5 快取查詢結果

對於頻繁查詢HBase的應用場景,可以考慮在應用程式中做快取,當有新的查詢請求時,首先在快取中查詢,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然後在應用程式中將查詢結果快取起來。至於快取的替換策略,可以考慮LRU等常用的策略。

3.6 Blockcache

HBase上Regionserver的記憶體分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用於讀。寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以後,會啟動 flush重新整理到磁碟。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush程序,從最大的Memstore開始flush直到低於限制。讀請求先到Memstore中查資料,查不到就到BlockCache中查,再查不到就會到磁碟上讀,並把讀的結果放入BlockCache。由於 BlockCache採用的是LRU策略,因此BlockCache達到上限(heapsize *hfile.block.cache.size * 0.85)後,會啟動淘汰機制,淘汰掉最老的一批資料。一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,否則HBase不能啟動。預設BlockCache為0.2,而Memstore為0.4。對於注重讀響應時間的系統,可以將 BlockCache設大些,比如設定BlockCache=0.4,Memstore=0.39,以加大快取的命中率

4、參考資料

http://blog.linezing.com/2012/03/hbase-performance-optimization(Hbase效能方法優化總結)