1. 程式人生 > >HBase性能優化方法總結(一)

HBase性能優化方法總結(一)

rec inter next memstore 不支持 lena cred 追加 查詢效率

表的設計

1.1 Pre-Creating Regions

默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。

有關預分區,詳情參見:Table Creation: Pre-Creating Regions,下面是一個例子:

public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)

throws IOException {
try {
admin.createTable(table, splits);
return true;
} catch (TableExistsException e) {
logger.info("table " + table.getNameAsString() + " already exists");
// the table already exists...
return false;
}
}

public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) { //start:001,endkey:100,10region [001,010]

[011,020]
byte[][] splits = new byte[numRegions-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
lowestKey = lowestKey.add(regionIncrement);

for(int i=0; i < numRegions-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}

1.2 Row Key

HBaserow key用來檢索表中的記錄,支持以下三種方式:

  • 通過單個row key訪問:即按照某個row key鍵值進行get操作;
  • 通過row keyrange進行scan:即通過設置startRowKeyendRowKey,在這個範圍內進行掃描;
  • 全表掃描:即直接掃描整張表中所有行記錄。

HBase中,row key可以是任意字符串,最大長度64KB,實際應用中一般為10~100bytes,存為byte[]字節數組,一般設計成定長的

row key是按照字典序存儲,因此,設計row key時,要充分利用這個排序特點,將經常一起讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。

舉個例子:如果最近寫入HBase表中的數據是最可能被訪問的,可以考慮將時間戳作為row key的一部分,由於是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作為row key,這樣能保證新寫入的數據在讀取時可以被快速命中。

Rowkey規則

1、 越小越好

2、 Rowkey的設計是要根據實際業務來

3、 散列性

a) 取反 001 002 100 200

b) Hash

1.3 Column Family

不要在一張表裏定義太多的column family。目前Hbase並不能很好的處理超過2~3column family的表。因為某個column familyflush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終導致系統產生更多的I/O。感興趣的同學可以對自己的HBase集群進行實際測試,從得到的測試結果數據驗證一下。

1.4 In Memory

創建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。

1.5 Max Version

創建表的時候,可以通過HColumnDescriptor.setMaxVersions(int maxVersions)設置表中數據的最大版本,如果只需要保存最新版本的數據,那麽可以設置setMaxVersions(1)

1.6 Time To Live

創建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過期數據將自動被刪除,例如如果只需要存儲最近兩天的數據,那麽可以設置setTimeToLive(2 * 24 * 60 * 60)

1.7 Compact & Split

HBase中,數據在更新時首先寫入WAL 日誌(HLog)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到一定閾值時,就會創建一個新的MemStore,並且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成為一個StoreFile。於此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻之前的變更已經持久化了(minor compact)

StoreFile是只讀的,一旦創建後就不可以再修改。因此Hbase的更新其實是不斷追加的操作。當一個Store中的StoreFile達到一定的閾值後,就會進行一次合並(major compact),將對同一個key的修改合並到一起,形成一個大的StoreFile,當StoreFile的大小達到一定閾值後,又會對 StoreFile進行分割(split),等分為兩個StoreFile

由於對表的更新是不斷追加的,處理讀請求時,需要訪問Store中全部的StoreFileMemStore,將它們按照row key進行合並,由於StoreFileMemStore都是經過排序的,並且StoreFile帶有內存中索引,通常合並過程還是比較快的。

實際應用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合並形成一個大的StoreFile。同時,可以將StoreFile設置大些,減少split的發生。

hbase為了防止小文件(被刷到磁盤的menstore)過多,以保證保證查詢效率,hbase需要在必要的時候將這些小的store file合並成相對較大的store file,這個過程就稱之為compaction。在hbase中,主要存在兩種類型的compactionminor compactionmajor compaction

minor compaction:的是較小、很少文件的合並。

major compaction 的功能是將所有的store file合並成一個,觸發major compaction的可能條件有:major_compact 命令、majorCompact() APIregion server自動運行(相關參數:hbase.hregion.majoucompaction 默認為24 小時、hbase.hregion.majorcompaction.jetter 默認值為0.2 防止region server 在同一時間進行major compaction)。

hbase.hregion.majorcompaction.jetter參數的作用是:對參數hbase.hregion.majoucompaction 規定的值起到浮動的作用,假如兩個參數都為默認值240,2,那麽major compact最終使用的數值為:19.2~28.8 這個範圍。

1、 關閉自動major compaction

2、 手動編程major compaction

Timer類,contab

minor compaction的運行機制要復雜一些,它由一下幾個參數共同決定:

hbase.hstore.compaction.min :默認值為 3,表示至少需要三個滿足條件的store file時,minor compaction才會啟動

hbase.hstore.compaction.max 默認值為10,表示一次minor compaction中最多選取10store file

hbase.hstore.compaction.min.size 表示文件大小小於該值的store file 一定會加入到minor compactionstore file

hbase.hstore.compaction.max.size 表示文件大小大於該值的store file 一定會被minor compaction排除

hbase.hstore.compaction.ratio store file 按照文件年齡排序(older to younger),minor compaction總是從older store file開始選擇

寫表操作

2.1 HTable並發寫

創建多個HTable客戶端用於寫操作,提高寫數據的吞吐量,一個例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
wTableLog[i] = new HTable(conf, table_log_name);
wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
wTableLog[i].setAutoFlush(false);
}

2.2 HTable參數設置

2.2.1 Auto Flush

通過調用HTable.setAutoFlush(false)方法可以將HTable寫客戶端的自動flush關閉,這樣可以批量寫入數據到HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫緩存時,才實際向HBase服務端發起寫請求。默認情況下auto flush是開啟的。

2.2.2 Write Buffer

通過調用HTable.setWriteBufferSize(writeBufferSize)方法可以設置HTable客戶端的寫buffer大小,如果新設置的buffer小於當前寫buffer中的數據時,buffer將會被flush到服務端。其中,writeBufferSize的單位是byte字節數,可以根據實際寫入數據量的多少來設置該值。

2.2.3 WAL Flag

HBae中,客戶端向集群中的RegionServer提交數據時(Put/Delete操作),首先會先寫WALWrite Ahead Log)日誌(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當WAL日誌寫成功後,再接著寫MemStore,然後客戶端被通知提交數據成功;如果寫WAL日誌失敗,客戶端則被通知提交失敗。這樣做的好處是可以做到RegionServer宕機後的數據恢復。

因此,對於相對不太重要的數據,可以在Put/Delete操作時,通過調用Put.setWriteToWAL(false)Delete.setWriteToWAL(false)函數,放棄寫WAL日誌,從而提高數據寫入的性能。

值得註意的是:謹慎選擇關閉WAL日誌,因為這樣的話,一旦RegionServer宕機,Put/Delete的數據將會無法根據WAL日誌進行恢復。

2.3 批量寫

通過調用HTable.put(Put)方法可以將一個指定的row key記錄寫入HBase,同樣HBase提供了另一個方法:通過調用HTable.put(List<Put>)方法可以將指定的row key列表,批量寫入多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對於對數據實時性要求高,網絡傳輸RTT高的情景下可能帶來明顯的性能提升。

2.4 多線程並發寫

在客戶端開啟多個HTable寫線程,每個寫線程負責一個HTable對象的flush操作,這樣結合定時flush和寫bufferwriteBufferSize),可以既保證在數據量小的時候,數據可以在較短時間內被flush(如1秒內),同時又保證在數據量大的時候,寫buffer一滿就及時進行flush。下面給個具體的例子:

for (int i = 0; i < threadN; i++) {
Thread th = new Thread() {
public void run() {
while (true) {
try {
sleep(1000); //1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (wTableLog[i]) {
try {
wTableLog[i].flushCommits();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
th.setDaemon(true);
th.start();
}

讀表操作

3.1 HTable並發讀

創建多個HTable客戶端用於讀操作,提高讀數據的吞吐量,一個例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
rTableLog[i] = new HTable(conf, table_log_name);
rTableLog[i].setScannerCaching(50);
}

3.2 HTable參數設置

3.2.1 Scanner Caching

hbase.client.scanner.caching配置項可以設置HBase scanner一次從服務端抓取的數據條數,默認情況下一次一條。通過將其設置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是scanner需要通過客戶端的內存來維持這些被cache的行記錄。

有三個地方可以進行配置:1)在HBaseconf配置文件中進行配置;2)通過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)通過調用Scan.setCaching(int caching)進行配置。三者的優先級越來越高。

3.2.2 Scan Attribute Selection

scan時指定需要的Column Family,可以減少網絡傳輸數據量,否則默認scan操作會返回整行所有Column Family的數據。

3.2.3 Close ResultScanner

通過scan取完數據後,記得要關閉ResultScanner,否則RegionServer可能會出現問題(對應的Server資源無法釋放)。

3.3 批量讀

通過調用HTable.get(Get)方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調用HTable.get(List<Get>)方法可以根據一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對於對數據實時性要求高而且網絡傳輸RTT高的情景下可能帶來明顯的性能提升。

3.4 多線程並發讀

在客戶端開啟多個HTable讀線程,每個讀線程負責通過HTable對象進行get操作。下面是一個多線程並發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:

public class DataReaderServer {
//獲取店鋪一天內各分鐘PV值的入口函數
public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
long min = startStamp;
int count = (int)((endStamp - startStamp) / (60*1000));
List<String> lst = new ArrayList<String>();
for (int i = 0; i <= count; i++) {
min = startStamp + i * 60 * 1000;
lst.add(uid + "_" + min);
}
return parallelBatchMinutePV(lst);
}
//多線程並發查詢,獲取分鐘PV值
private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
int parallel = 3;
List<List<String>> lstBatchKeys = null;
if (lstKeys.size() < parallel ){
lstBatchKeys = new ArrayList<List<String>>(1);
lstBatchKeys.add(lstKeys);
}
else{
lstBatchKeys = new ArrayList<List<String>>(parallel);
for(int i = 0; i < parallel; i++ ){
List<String> lst = new ArrayList<String>();
lstBatchKeys.add(lst);
}

for(int i = 0 ; i < lstKeys.size() ; i ++ ){
lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
}
}

List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);

ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("ParallelBatchQuery");
ThreadFactory factory = builder.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);

for(List<String> keys : lstBatchKeys){
Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
futures.add(future);
}
executor.shutdown();

// Wait for all the tasks to finish
try {
boolean stillRunning = !executor.awaitTermination(
5000000, TimeUnit.MILLISECONDS);
if (stillRunning) {
try {
executor.shutdownNow();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}

// Look for any exception
for (Future f : futures) {
try {
if(f.get() != null)
{
hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}

return hashRet;
}
//一個線程批量查詢,獲取分鐘PV值
protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = null;
List<Get> lstGet = new ArrayList<Get>();
String[] splitValue = null;
for (String s : lstKeys) {
splitValue = s.split("_");
long uid = Long.parseLong(splitValue[0]);
long min = Long.parseLong(splitValue[1]);
byte[] key = new byte[16];
Bytes.putLong(key, 0, uid);
Bytes.putLong(key, 8, min);
Get g = new Get(key);
g.addFamily(fp);
lstGet.add(g);
}
Result[] res = null;
try {
res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
} catch (IOException e1) {
logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
}

if (res != null && res.length > 0) {
hashRet = new ConcurrentHashMap<String, String>(res.length);
for (Result re : res) {
if (re != null && !re.isEmpty()) {
try {
byte[] key = re.getRow();
byte[] value = re.getValue(fp, cp);
if (key != null && value != null) {
hashRet.put(String.valueOf(Bytes.toLong(key,
Bytes.SIZEOF_LONG)), String.valueOf(Bytes
.toLong(value)));
}
} catch (Exception e2) {
logger.error(e2.getStackTrace());
}
}
}
}

return hashRet;
}
}
//調用接口類,實現Callable接口
class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
private List<String> keys;

public BatchMinutePVCallable(List<String> lstKeys ) {
this.keys = lstKeys;
}

public ConcurrentHashMap<String, String> call() throws Exception {
return DataReadServer.getBatchMinutePV(keys);
}
}

3.5 緩存查詢結果

對於頻繁查詢HBase的應用場景,可以考慮在應用程序中做緩存,當有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然後在應用程序中將查詢結果緩存起來。至於緩存的替換策略,可以考慮LRU等常用的策略。

3.6 Blockcache

HBaseRegionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用於讀。

寫請求會先寫入MemstoreRegionserver會給每個region提供一個Memstore,當Memstore滿64MB以後,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低於限制。

讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。由於BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)後,會啟動淘汰機制,淘汰掉最老的一批數據。

一個Regionserver上有一個BlockCacheNMemstore,它們的大小之和不能大於等於heapsize * 0.8,否則HBase不能啟動。默認BlockCache0.2,而Memstore0.4對於註重讀響應時間的系統,可以將 BlockCache設大些,比如設置BlockCache=0.4Memstore=0.39,以加大緩存的命中率。

有關BlockCache機制,請參考這裏:HBaseBlock cacheHBaseblockcache機制hbase中的緩存的計算與使用

HTableHTablePool使用註意事項

HTableHTablePool都是HBase客戶端API的一部分,可以使用它們對HBase表進行CRUD操作。下面結合在項目中的應用情況,對二者使用過程中的註意事項做一下概括總結。

Configuration conf = HBaseConfiguration.create();

try (Connection connection = ConnectionFactory.createConnection(conf)) {

try (Table table = connection.getTable(TableName.valueOf(tablename)) {

// use table as needed, the table returned is lightweight

}

}

四 HTable

HTableHBase客戶端與HBase服務端通訊的Java API對象,客戶端可以通過HTable對象與服務端進行CRUD操作(增刪改查)。它的創建很簡單:

Configuration conf = HBaseConfiguration.create();

HTable table = new HTable(conf, "tablename");

//TODO CRUD Operation……

HTable使用時的一些註意事項:

1. 規避HTable對象的創建開銷

因為客戶端創建HTable對象後,需要進行一系列的操作:檢查.META.表確認指定名稱的HBase表是否存在,表是否有效等等,整個時間開銷比較重,可能會耗時幾秒鐘之長,因此最好在程序啟動時一次性創建完成需要的HTable對象,如果使用Java API,一般來說是在構造函數中進行創建,程序啟動後直接重用。

2. HTable對象不是線程安全的

HTable對象對於客戶端讀寫數據來說不是線程安全的,因此多線程時,要為每個線程單獨創建復用一個HTable對象,不同對象間不要共享HTable對象使用,特別是在客戶端auto flash被置為false時,由於存在本地write buffer,可能導致數據不一致。

3. HTable對象之間共享Configuration

HTable對象共享Configuration對象,這樣的好處在於:

  • 共享ZooKeeper的連接:每個客戶端需要與ZooKeeper建立連接,查詢用戶的table regions位置,這些信息可以在連接建立後緩存起來共享使用;
  • 共享公共的資源:客戶端需要通過ZooKeeper查找-ROOT-.META.表,這個需要網絡傳輸開銷,客戶端緩存這些公共資源後能夠減少後續的網絡傳輸開銷,加快查找過程速度。

因此,與以下這種方式相比:

HTable table1 = new HTable("table1");

HTable table2 = new HTable("table2");

下面的方式更有效些:

Configuration conf = HBaseConfiguration.create();

HTable table1 = new HTable(conf, "table1");

HTable table2 = new HTable(conf, "table2");

備註:即使是高負載的多線程程序,也並沒有發現因為共享Configuration而導致的性能問題;如果你的實際情況中不是如此,那麽可以嘗試不共享Configuration

五 HTablePool

HTablePool可以解決HTable存在的線程不安全問題,同時通過維護固定數量的HTable對象,能夠在程序運行期間復用這些HTable資源對象。

Configuration conf = HBaseConfiguration.create();

HTablePool pool = new HTablePool(conf, 10);

1. HTablePool可以自動創建HTable對象,而且對客戶端來說使用上是完全透明的,可以避免多線程間數據並發修改問題。

2. HTablePool中的HTable對象之間是公用Configuration連接的,能夠可以減少網絡開銷。

HTablePool的使用很簡單:每次進行操作前,通過HTablePoolgetTable方法取得一個HTable對象,然後進行put/get/scan/delete等操作,最後通過HTablePoolputTable方法將HTable對象放回到HTablePool中。

下面是個使用HTablePool的簡單例子:

public void createUser(String username, String firstName, String lastName, String email, String password, String roles) throws IOException {

  HTable table = rm.getTable(UserTable.NAME);

  Put put = new Put(Bytes.toBytes(username));

  put.add(UserTable.DATA_FAMILY, UserTable.FIRSTNAME,

  Bytes.toBytes(firstName));

  put.add(UserTable.DATA_FAMILY, UserTable.LASTNAME,

    Bytes.toBytes(lastName));

  put.add(UserTable.DATA_FAMILY, UserTable.EMAIL, Bytes.toBytes(email));

  put.add(UserTable.DATA_FAMILY, UserTable.CREDENTIALS,

    Bytes.toBytes(password));

  put.add(UserTable.DATA_FAMILY, UserTable.ROLES, Bytes.toBytes(roles));

  table.put(put);

  table.flushCommits();

  rm.putTable(table);

}

HbaseDBMS比較:

查詢數據不靈活:

1、 不能使用column之間過濾查詢

2、 不支持全文索引。使用solrhbase整合完成全文搜索。

a) 使用MR批量讀取hbase中的數據,在solr裏面建立索引(no store)之保存rowkey的值。

b) 根據關鍵詞從索引中搜索到rowkey(分頁)

c) 根據rowkeyhbase查詢所有數據

HBase性能優化方法總結(一)