1. 程式人生 > >spark常見操作系列(3)--spark讀寫hbase(2)

spark常見操作系列(3)--spark讀寫hbase(2)

接著上一篇,
問題(2):

scan有

scan.setCaching(10000)

scan.setCacheBlocks(true)

等設定.setCaching ,個人感覺不夠用.hbase 預設是在記憶體裡面放一塊資料用來讀取,所以讀取效率比較高,可是,其餘大部分資料還是在硬碟中,這個記憶體資料塊的設定和意義,待清晰研究.

單節點hbase的寫入效率,有人粗估計,在3萬-5萬,這個效率是遠高於傳統資料庫的.一般讀效率比寫效率要高,我們之前使用3臺機器,查詢10000條資料,get方式,用了3秒.雖然中間有處理hdfs上csv檔案的過程,這還是讓本人感覺還有很大提升的地方.

如果我想要提高hbase的讀取效率,我該怎麼做?

hbase讀資料流程,放一張圖片:

這裡寫圖片描述

hbase讀資料,我的理解是:

(1)hbase 有個Blockcache記憶體塊,用來放一小部分硬碟上資料供隨時讀取,這個塊size可以配置;開啟方式,比如:

create'MyTable',{NAME =>'myCF',PREFETCH_BLOCKS_ON_OPEN =>'true'}

或者java方式:

HTableDescriptor tableDesc = new HTableDescriptor("myTable");
HColumnDescriptor cfDesc = new HColumnDescriptor("myCF");
cfDesc.setPrefetchBlocksOnOpen(true);
tableDesc.addFamily(cfDesc);

如果block塊中沒有結果資料,就得去硬碟上再載入一塊去block cache;

(2)hbase是用socket套接字連線,使用hbase api 操作,影響效率的因素兼具記憶體和網路IO等影響.

(3)每個client 請求,都會有多個rpc請求, rpc一批批的把結果資料返回至client端;
通過scan.setCaching(10000) 方式設定每一批資料行數. 本人一般設成10000,應用場景是,如果使用者查詢頻率較多,而結果資料較少,那麼不要設太大;如果使用者需要查詢過去幾年的資料做計算統計分析,可以適當設高一點.這個設定,其實本人感覺也是不確定較大.

覺得hbase 並沒有做成完全記憶體式的方式有些遺憾, 而且hbase的多項引數調優範圍不確定很高,但是,我是應該對hbase這個元件的某些內容做出自己的質疑呢,還是按部就班的按照hbase 開發者方面提供的調優建議一條一條來呢?

扯遠了.

hbase讀取調優

這裡分2個方面來說,
(1)程式碼配置和程式碼設計優化.
(2)hbase本身的調優項.

先說優化(1):程式碼配置和程式碼設計優化.

為此,本人找了一些資料,看上去,有一些對部分引數做了針對性的分析,可是這些調優項協調起來,對hbase整體性提高如何,並不清楚.

然而,我還是找到了一個比較系統的資料,Quora 上有推薦cloudera寫的系統化比較強的文件.是英文的,有條件的可以使用谷歌翻譯.現摘抄一些如下:

(一).使用位元組陣列的常量.

When people get started with HBase they have a tendency to write code that looks like this:

    Get get = new Get(rowkey);
    Result r = table.get(get);
    byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr"));  // returns current version of value

But especially when inside loops (and MapReduce jobs), converting the columnFamily and column-names to byte-arrays repeatedly is surprisingly expensive. It’s better to use constants for the byte-arrays, like this:

    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR = "attr".getBytes();
    ...
    Get get = new Get(rowkey);
    Result r = table.get(get);
    byte[] b = r.getValue(CF, ATTR);  // returns current version of value

意思就是用 “cf”.getBytes() 代替Bytes.toBytes(“cf”) ,這樣會少轉換一次.這種方式本人還沒嘗試過,聽起來可以一試.

(二) 批量載入

98.1. Batch Loading

Use the bulk load tool if you can. See Bulk Loading. Otherwise, pay attention to the below.

98.2. Table Creation: Pre-Creating Regions

Tables in HBase are initially created with one region by default. For bulk imports, this means that all clients will write to the same region until it is large enough to split and become distributed across the cluster. A useful pattern to speed up the bulk import process is to pre-create empty regions. Be somewhat conservative in this, because too-many regions can actually degrade performance.

There are two different approaches to pre-creating splits. The first approach is to rely on the default Admin strategy (which is implemented in Bytes.split)…

    byte[] startKey = ...;      // your lowest key
    byte[] endKey = ...;        // your highest key
    int numberOfRegions = ...;  // # of regions to create
    admin.createTable(table, startKey, endKey, numberOfRegions);

And the other approach is to define the splits yourself…

    byte[][] splits = ...;   // create your own splits
    admin.createTable(table, splits);

See Relationship Between RowKeys and Region Splits for issues related to understanding your keyspace and pre-creating regions. See manual region splitting decisions for discussion on manually pre-splitting regions.

一般讀寫hbase使用Mapreduce 或hbase Api 方式.

批量載入適用那些Mapreduce 來做批量任務. 由於減少了一些client請求環節,它的記憶體和網路IO消耗要比hbase api小一些.

本人理解的做法是,把Mapreduce 批量任務做出jar包形式,利用shell指令碼來執行會多一些.之前使用過的一個hbase 中介軟體: key-value store indexer ,也有一種Mapreduce批量處理的方式.

(三)scan caching

If HBase is used as an input source for a MapReduce job, for example, make sure that the input Scan instance to the MapReduce job has setCaching set to something greater than the default (which is 1). Using the default value means that the map-task will make call back to the region-server for every record processed. Setting this value to 500, for example, will transfer 500 rows at a time to the client to be processed. There is a cost/benefit to have the cache value be large because it costs more in memory for both client and RegionServer, so bigger isn’t always better.

這是必選項,一般通過

scan.setCaching(10000)

來設定.

(四)關閉ResultScanners

This isn’t so much about improving performance but rather avoiding performance problems. If you forget to close ResultScanners you can cause problems on the RegionServers. Always have ResultScanner processing enclosed in try/catch blocks.

    Scan scan = new Scan();
    // set attrs...
    ResultScanner rs = table.getScanner(scan);
    try {
      for (Result r = rs.next(); r != null; r = rs.next()) {
      // process result...
    } finally {
      rs.close();  // always close the ResultScanner!
    }
    table.close();

(五)使用Bloom過濾器

Enabling Bloom Filters can save your having to go to disk and can help improve read latencies.

Bloom filters were developed over in HBase-1200 Add bloomfilters. For description of the development process — why static blooms rather than dynamic — and for an overview of the unique properties that pertain to blooms in HBase, as well as possible future directions, see the Development Process section of the document BloomFilters in HBase attached to HBASE-1200. The bloom filters described here are actually version two of blooms in HBase. In versions up to 0.19.x, HBase had a dynamic bloom option based on work done by the European Commission One-Lab Project 034819. The core of the HBase bloom work was later pulled up into Hadoop to implement org.apache.hadoop.io.BloomMapFile. Version 1 of HBase blooms never worked that well. Version 2 is a rewrite from scratch though again it starts with the one-lab work.

布隆過濾器,這個,本人感覺,是個比較好的建議,值得嘗試.我找到一個測試對比表格,可以簡單看下它的效果.

使用航空公司的交通 資料 進行布隆過濾器實驗。HBase表中載入了大約500萬條來自該資料集的記錄。以下是結果:

布隆過濾器效果對比

可以通過以下配置設定布隆過濾器:

io.hfile.bloom.enabled global kill switch

io.hfile.bloom.enabled in Configuration serves as the kill switch in case something goes wrong. Default = true.

io.hfile.bloom.error.rate

io.hfile.bloom.error.rate = average false positive rate. Default = 1%. Decrease rate by ½ (e.g. to .5%) == +1 bit per bloom entry.

io.hfile.bloom.max.fold

io.hfile.bloom.max.fold = guaranteed minimum fold rate. Most people should leave this alone. Default = 7, or can collapse to at least 1/128th of original size. See the Development Process section of the document BloomFilters in HBase for more on what this option means.

就是:
io.hfile.bloom.enabled 預設true
io.hfile.bloom.error.rate 預設1%, 減小到0.5%
io.hfile.bloom.max.fold 預設7,儘量減小該值.

感覺細節是很多的.這些方面,本人也是在學習中.
先寫到這裡吧.

hbase檢索調優的2個問題總結

首先,hbase的使用環境相比mysql等較為複雜,而且一般為分散式的架構,在實際中,如果使用不好,並不會帶來多大好處,反而適得其反。現總結一下本人使用hbase的心得。

問題1:基於rowkey的條件檢索。
問題2:檢索的列不確定,或多或少。

hbase一般檢索使用rowkey,類似於唯一id。如果知道rowkey的值,查詢的效率最高。

比如,檢索某一行:

hbase(main)> get 't1','rowkey001'

一般檢索中,極少有直接知道rowkey的值的,因此,這種檢索方式不實用。

一般檢索多是條件檢索,或相關檢索。

對於問題1,我們對此做了一些探討和設計,具體有:

  1. 將重要的列拼接成rowkey,如分割槽號,編號,時間等。

  2. 簡化查詢列的命名,比如編號,統一為6位數格式;性別,統一為1或0;某條件只有3個分支,則統一為1,2,3.

  3. 比較高表和寬表的適用性,根據情況,擇優使用。

  4. 一個表減少列族的數量。

  5. 分析每個單元儲存什麼資料,優化單元的資料結構,使其緊湊。

以上的優化,針對的多是資料的總量很大,而資料的列不是很多的情況。如果資料的列很多,而檢索條件又是不確定,可能某次檢索需要幾列,又可能需要十幾列,幾十列。隨著一次性檢索的列數的增加,每次IO的流量相應增加,由於hbase的分散式架構,如果客戶端是處於外網的環境,IO效率可能很糟糕,其網路也隨之需要優化。另一方面,hbase不適合類SQL查詢。

針對問題2,如果一開始完全沿用基於rowkey的條件檢索,雖然看起來沒有問題,但在實際操作中,卻碰到了一些問題,比如檢索條件受限,檢索效率問題,rpc 網路超時問題。

就這些問題,需要更強大的檢索架構。本人覺得可能的思路有2種:

  1. 使用搜索引擎,和以elasticsearch為代表的搜尋引擎搭配使用。

  2. 使用spark on hive。

使用搜索引擎,無疑會很大提高檢索效率,也無需在乎每日增加的資料量。但核心缺點在於記憶體使用量大,有一定門檻。

使用hive,需要很強的設計能力,hadoop on hive,雖能解決寫入和檢索,可是實際效率相對比較低,存在優化瓶頸;spark on hive,同樣存在記憶體使用量大,spark調優也需要精確控制。但目前spark sql可以支援hiveql語法解析器。這裡筆者探討的是檢索,而spark sql或hive,已進入BI分析的範疇,就不費過多筆墨了。