1. 程式人生 > >HBase BlockCache機制講解

HBase BlockCache機制講解

HBase上RegionServer的cache主要分為兩個部分,分別是memstore&blockcache,其中memstore主要用於寫快取,而blockcache用於讀快取。

當資料寫入hbase時,會先寫入memstore,RegionServer會給每個region提供一個memstore,memstore中的資料達到系統設定的水位值後,會觸發flush將memstore中的資料刷寫到磁碟。

客戶的讀請求會先到memstore中查資料,若查不到就到blockcache中查,再查不到就會從磁碟上讀,並把讀入的資料同時放入blockcahce。我們知道快取有三種不同的更新策略,分別是先入先出(FIFO)、LRU(最近最少使用)和LFU(最近最不常使用),hbase的block使用的是LRU策略,當BlockCache的大小達到上限後,會觸發快取淘汰機制,將最老的一批資料淘汰掉。

一個RegionServer上有一個BlockCache和N個Memstore。下面我們從hbase的原始碼中展開闡述Blockcache的具體實現,並在講解實現的中間補充闡述關於快取的相關機制介紹。

BlockCache在HBase中所處的位置如下圖中所示:

BlockCache的實現是基於On-heap ConcurrentHashMap。map的key是BlockCacheKey型別的物件,包括了offset、hfileName等成員變數,map的value是LruCachedBlock型別的物件,表示快取的實體,該物件中定義了成員變數accesstime,用於LRU淘汰時的比較依據。BlockCache的大小是固定的,由引數hfile.block.cache.size決定,預設是RegionServer的堆記憶體的40%。

BlockCache的初始化在HRegionServer的handleReportForDutyResponse裡完成,HRegionServer有一個HeapMemoryManager型別的成員變數,用於管理RegionServer程序的堆記憶體,HeapMemoryManager中的blockCache就是RegionServer中的讀快取,它的初始化在CacheConfig的instantiateBlockCache方法中完成,剪掉一些判斷BlockCache是否禁用的程式碼,我們列出其中的主要邏輯如下:

 

static synchronized BlockCache instantiateBlockCache(Configuration conf) {
  MemoryUsage mu 
= ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); LruBlockCache l1 = getL1(conf, mu); BlockCache l2 = getL2(conf, mu); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); if (useExternal) { GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { if (combinedWithLru) { GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; }


 

其中的GLOBAL_BLOCK_CACHE_INSTANCE是CacheConfig中維護的靜態BlockCache例項,也就是我們要返回給RegionServer的讀快取。注意程式碼中對useExternal和combinedWithLru的判斷,如果指定了useExternal為true,則結合memcached等外部快取與BlockCache一起使用。如果指定了combinedWithLru,則結合bucketCache,也就是堆外記憶體與BlockCache一起使用。在上述兩種情況下,BlockCache用於存放索引等元資料,真實的資料檔案則快取在memcached或bucketCache中。

如果想使用上述兩種特性,可以分別將"hbase.block.use.external"或"hbase.bucketcache.combinedcache.enabled"設定為true。其中,外部快取物件經hbase.blockcache.external.class由反射方法注入,關於hbase中外部快取的使用可以參看HBase的issue13170,裡面有詳細的介紹。

BlockCache基於客戶端對資料的訪問頻率,定義了三個不同的優先順序,如下所示:

SINGLE:如果一個Block被第一次訪問,則該Block被放在這一優先順序佇列中;

MULTI:如果一個Block被多次訪問,則從single移到Multi中;

MEMORY:memory優先順序由使用者指定,一般不推薦,只用系統表才使用memory優先順序;

以上將cache分級的好處在於:

首先,通過Memory型別的cache,可以將重要的資料放到RegionServer記憶體中常駐,例如Meta或者namespace的元資料資訊;

其次,通過區分single和multi型別cache,可以防止由於scan操作帶來的cache頻繁顛簸,將最少使用的block加入到淘汰演算法中;

預設配置下,對於整個blockcache的記憶體,按照以下百分比分配給single、multi和inMemory使用:0.25、0.5和0.25;

下面我們分析將Block塊加入快取的實現,主要程式碼如下所示:

  

public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
    final boolean cacheDataInL1) {
  //首先判斷cacheKey是否已被快取,省略

  LruCachedBlock cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);  //建立BlockCache中的實體
  long newSize = updateSizeMetrics(cb, false);  //更新metrics
  map.put(cacheKey, cb);
  
  if (newSize > acceptableSize() && !evictionInProgress) { //如果cache達到大小限制,執行evict邏輯
    runEviction();
  }
}

 

1、首先假設不會對同一個已經被快取的BlockCacheKey重複放入cache操作;

2、根據是否inmemory建立不同類別的CachedBlock物件:若inMemory為true則建立BlockPriority.MEMORY型別,否則建立BlockPriority.SINGLE型別;

3、將BlockCacheKey和建立的CachedBlock物件加入到前文說過的ConcurrentHashMap中,同時更新log&metrics上的計數;

4、最後判斷如果加入新block後cache size大於設定的臨界值且當前沒有淘汰執行緒執行,則呼叫runEviction()方法啟動LRU淘汰執行緒,runEviciton方法如下:

private void runEviction() { if (evictionThread == null) { evict(); } else { evictionThread.evict(); } }

其中淘汰執行緒evictionThread在LruBlockCache初始化的同時建立,並且指定為守護daemon執行緒;

evictionThread用於與主執行緒同步完成block cache的淘汰過程,該過程的主要邏輯在run方法中:

public void run() { enteringRun = true; while (this.go) { synchronized(this) { try { this.wait(1000 * 10/*Don't wait for ever*/); } catch(InterruptedException e) { LOG.warn("Interrupted eviction thread ", e); Thread.currentThread().interrupt(); } } LruBlockCache cache = this.cache.get(); if (cache == null) break; cache.evict(); } }

evictionThread執行緒啟動後,呼叫wait被阻塞住,直到EvictionThread執行緒的evict方法被runEviction呼叫後,evict中執行notifyAll喚醒被阻塞住的evictionThread主執行緒,主執行緒繼續執行LruBlockCache的evict方法進行真正的淘汰過程。evict方法的主流程如下所示:

 

void evict() {
  if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size.get(); long bytesToFree = currentSize - minSize(); if(bytesToFree <= 0) return; BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); for(LruCachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE: { bucketSingle.add(cachedBlock); break; } case MULTI: { bucketMulti.add(cachedBlock); break; } case MEMORY: { bucketMemory.add(cachedBlock); break; } } } long bytesFreed = 0; if (forceInMemory || memoryFactor > 0.999f) { long s = bucketSingle.totalSize(); long m = bucketMulti.totalSize(); if (bytesToFree > (s + m)) { bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); } else { long bytesRemain = s + m - bytesToFree; if (3 * s <= bytesRemain) { bytesFreed = bucketMulti.free(bytesToFree); } else if (3 * m <= 2 * bytesRemain) { bytesFreed = bucketSingle.free(bytesToFree); } else { bytesFreed = bucketSingle.free(s - bytesRemain / 3); if (bytesFreed < bytesToFree) { bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); } } } } else { PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; BlockBucket bucket; while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }

下面我們跟著程式碼詳解evict中每一步的實現及含義:

1、首先獲取鎖,保證同一時刻只有一個淘汰執行緒正在執行;

2、計算得到當前block cache總大小currentSize以及需要被淘汰釋放掉的大小bytesToFree,如果bytesToFree小於等於0則不進行後續操作;

3、初始化建立三個BlockBucket物件,物件中包含了一個元素為LruCachedBlock的MinMaxPriorityQueue佇列,分別用於三種優先順序的cahceBlock物件,佇列按LRU(最近最少使用)的原則維護BlockBucket中快取住的所有物件;

4、遍歷全域性ConcurrentHashMap中的所有BlockCache,依型別加入到相應的BlockBucket佇列中;

5、如果指定了放入最高優先順序memory,則根據single、multi和bytesFreed三者之間的關係計算在各個佇列中需要釋放的空間,此種情況不推薦,因此不再細述;

6、將以上三個BlockBucket佇列加入到一個優先順序佇列bucketQueue中,佇列按照各個BlockBucket超出指定bucketSize的大小(overflow)順序排序;

7、遍歷優先順序佇列,對於每個BlockBucket,通過Math.min(overflow,(bytesToFree - bytesFreed)/remainingBuckets)計算出需要釋放的空間大小,這樣做可以保證儘可能平均地從三個BlockBucket釋放LruCachedBlock。釋放過程在BlockBucket的free方法;

8、具體到free方法,它每次從BlockBucket維護的隊尾取出一個LruCachedBlock物件並呼叫evictBlock方法,evictBlock將LruCachedBlock從全域性的concurrentHashMap中移除,同時更新相關計數;

9、如果有bucketCache或者memcached等其它輔助快取,第八步總淘汰掉的CacheBlock會進入輔助快取;

前面講過一個LruCachedBlock如果被多次連續訪問,那麼它會從SINGLE優先順序升級到MULTI優先順序,這部分邏輯在getBlock中實現,getBlock接收使用者傳入的BlockCacheKey,並返回該Key制定的LruCachedBlock,如果目標LruCachedBlock在全域性map中存在,那麼會觸發LruCachedBlock的access執行,將目標LruCachedBlock從SINGLE優先順序調整為MULTI。

 

public void access(long accessTime) { this.accessTime = accessTime; if(this.priority == BlockPriority.SINGLE) { this.priority = BlockPriority.MULTI; } }

引數accessTime指定了呼叫access的次數,每呼叫access一次,accessTime就自增1,這是由於LruCachedBlock在BlockBucket中是按照升序排列的。accessTime越大,則LruCachedBlock在佇列中的位置越靠後,執行淘汰時,就越晚被淘汰。

需要注意的是在BlockCache中的資料是經過decompressed(解壓縮)的,使用者可將如下配置修改為true,當其為true時讀入blockcache中的資料不會經過解壓縮,如此可以增大單位blockcache中可快取的資料條數,但是使用者讀取資料資料需要解壓縮。

 

hbase.block.data.cachecompressed

上述配置的預設值是false。