1. 程式人生 > >HBase建表高階屬性,hbase應用案例看行鍵設計,HBase和mapreduce結合,從Hbase中讀取資料、分析,寫入hdfs,從hdfs中讀取資料寫入Hbase,協處理器和二級索引

HBase建表高階屬性,hbase應用案例看行鍵設計,HBase和mapreduce結合,從Hbase中讀取資料、分析,寫入hdfs,從hdfs中讀取資料寫入Hbase,協處理器和二級索引

1. Hbase高階應用

1.1建表高階屬性

下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性

1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式
布隆過濾可以每列族單獨啟用。
使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 對列族單獨啟用布隆。
 Default = ROW 對行進行布隆過濾。
 對 ROW,行鍵的雜湊在每次插入行時將被新增到布隆。
 對 ROWCOL,行鍵 + 列族 + 列族修飾的雜湊將在每次插入行時新增到布隆
使用方法: create ‘table’,{BLOOMFILTER =>’ROW’}
啟用布隆過濾可以節省讀磁碟過程,可以有助於降低讀取延遲

2、 VERSIONS 預設是1 這個引數的意思是資料保留1個 版本,如果我們認為我們的資料沒有這麼大的必要保留這麼多,隨時都在更新,而老版本的資料對我們毫無價值,那將此引數設為1 能節約2/3的空間
使用方法: create ‘table’,{VERSIONS=>’2’}

附:MIN_VERSIONS => ‘0’是說在compact操作執行之後,至少要保留的版本

3、 COMPRESSION 預設值是NONE 即不使用壓縮
這個引數意思是該列族是否採用壓縮,採用什麼壓縮演算法
使用方法: create ‘table’,{NAME=>’info’,COMPRESSION=>’SNAPPY’}
建議採用SNAPPY壓縮演算法
HBase中,在Snappy釋出之前(Google 2011年對外發布Snappy),採用的LZO演算法,目標是達到儘可能快的壓縮和解壓速度,同時減少對CPU的消耗;
在Snappy釋出之後,建議採用Snappy演算法(參考《HBase: The Definitive Guide》),具體可以根據實際情況對LZO和Snappy做過更詳細的對比測試後再做選擇。

Algorithm % remaining Encoding Decoding
GZIP 13.4% 21 MB/s 118 MB/s
LZO 20.5% 135 MB/s 410 MB/s
Zippy/Snappy 22.2% 172 MB/s 409 MB/s

如果建表之初沒有壓縮,後來想要加入壓縮演算法,可以通過alter修改schema

4. alter
使用方法:
如 修改壓縮演算法
disable ‘table’
alter ‘table’,{NAME=>’info’,COMPRESSION=>’snappy’}
enable ‘table’
但是需要執行major_compact ‘table’ 命令之後 才會做實際的操作。

5. TTL
預設是 2147483647 即:Integer.MAX_VALUE 值大概是68年
這個引數是說明該列族資料的存活時間,單位是s
這個引數可以根據具體的需求對資料設定存活時間,超過存過時間的資料將在表中不在顯示,待下次major compact的時候再徹底刪除資料
注意的是TTL設定之後 MIN_VERSIONS=>’0’ 這樣設定之後,TTL時間戳過期後,將全部徹底刪除該family下所有的資料,如果MIN_VERSIONS 不等於0那將保留最新的MIN_VERSIONS個版本的資料,其它的全部刪除,比如MIN_VERSIONS=>’1’ 屆時將保留一個最新版本的資料,其它版本的資料將不再儲存。

6. describe ‘table’ 這個命令查看了create table 的各項引數或者是預設值。

7. disable_all ‘toplist.*’ disable_all 支援正則表示式,並列出當前匹配的表的如下:
toplist_a_total_1001
toplist_a_total_1002
toplist_a_total_1008
toplist_a_total_1009
toplist_a_total_1019
toplist_a_total_1035

Disable the above 25 tables (y/n)? 並給出確認提示

8. drop_all 這個命令和disable_all的使用方式是一樣的

9. hbase 表預分割槽—-手動分割槽
預設情況下,在建立HBase表的時候會自動建立一個region分割槽,當匯入資料的時候,所有的HBase客戶端都向這一個region寫資料,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先建立一些空的regions,這樣當資料寫入HBase時,會按照region分割槽情況,在叢集內做資料的負載均衡。
命令方式:
create ‘t1’, ‘f1’, {NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’}
也可以使用api的方式:
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info
引數:
test_table是表名
HexStringSplit 是split 方式
-c 是分10個region
-f 是family

可在UI上檢視結果,如圖:
這裡寫圖片描述

這樣就可以將表預先分為15個區,減少資料達到storefile 大小的時候自動分割槽的時間消耗,並且還有以一個優勢,就是合理設計rowkey 能讓各個region 的併發請求平均分配(趨於均勻) 使IO 效率達到最高,但是預分割槽需要將filesize 設定一個較大的值,設定哪個引數呢 hbase.hregion.max.filesize 這個值預設是10G 也就是說單個region 預設大小是10G
這個引數的預設值在0.90 到0.92到0.94.3各版本的變化:256M–1G–10G

 但是如果MapReduce Input型別為TableInputFormat 使用hbase作為輸入的時候,就要注意了,每個region一個map,如果資料小於10G 那隻會啟用一個map 造成很大的資源浪費,這時候可以考慮適當調小該引數的值,或者採用預分配region的方式,並將檢測如果達到這個值,再手動分配region。

1.2 hbase應用案例看行鍵設計

表結構設計
1、列族數量的設定
以使用者資訊為例,可以將必須的基本資訊存放在一個列族,而一些附加的額外資訊可以放在另一列族;
2、行鍵的設計
語音詳單:
13877889988-20150625
13877889988-20150625
13877889988-20150626
13877889988-20150626
13877889989
13877889989
13877889989
—-將需要批量查詢的資料儘可能連續存放
CMS系統—-多條件查詢
儘可能將查詢條件關鍵詞拼裝到rowkey中,查詢頻率最高的條件儘量往前靠
20150230-zhangsan-category…
20150230-lisi-category…
(每一個條件的值長度不同,可以通過做定長對映來提高效率)

參考:《hbase 實戰》—-詳細講述了facebook /GIS等系統的表結構設計

1.3 Hbase和mapreduce結合

為什麼需要用mapreduce去訪問hbase的資料?
——加快分析速度和擴充套件分析能力
Mapreduce訪問hbase資料作分析一定是在離線分析的場景下應用
這裡寫圖片描述

1.3.1 從Hbase中讀取資料、分析,寫入hdfs

/**
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
 * @author duanhaitao@itcast.cn
 *
 */
public class HbaseReader {

    public static String flow_fields_import = "flow_fields_import";
    static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            byte[] bytes = key.copyBytes();
            String phone = new String(bytes);
            byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());
            String url = new String(urlbytes);
            context.write(new Text(phone + "\t" + url), NullWritable.get());

        }

    }

    static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

            context.write(key, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "spark01");

        Job job = Job.getInstance(conf);

        job.setJarByClass(HbaseReader.class);

//      job.setMapperClass(HdfsSinkMapper.class);
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
        job.setReducerClass(HdfsSinkReducer.class);

        FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.waitForCompletion(true);
    }

}

1.3.2 從hdfs中讀取資料寫入Hbase

/**
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
}
 * @author duanhaitao@itcast.cn
 *
 */
public class HbaseSinker {

    public static String flow_fields_import = "flow_fields_import";
    static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = line.split("\t");
            String phone = fields[0];
            String url = fields[1];

            FlowBean bean = new FlowBean(phone,url);

            context.write(bean, NullWritable.get());
        }
    }

    static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{

        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

            Put put = new Put(key.getPhone().getBytes());
            put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());

            context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);

        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "spark01");

        HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);

        boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
        if(tableExists){
            hBaseAdmin.disableTable(flow_fields_import);
            hBaseAdmin.deleteTable(flow_fields_import);
        }
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
        desc.addFamily(hColumnDescriptor);

        hBaseAdmin.createTable(desc);


        Job job = Job.getInstance(conf);

        job.setJarByClass(HbaseSinker.class);

        job.setMapperClass(HbaseSinkMrMapper.class);
        TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Mutation.class);

        job.waitForCompletion(true);    
    }   
}

1.3 hbase高階程式設計

1.3.1 協處理器—- Coprocessor

協處理器有兩種:observer和endpoint
Observer允許叢集在正常的客戶端操作過程中可以有不同的行為表現
Endpoint允許擴充套件叢集的能力,對客戶端應用開放新的運算命令

 Observer協處理器
 正常put請求的流程:
這裡寫圖片描述
 加入Observer協處理後的put流程:
這裡寫圖片描述

1 客戶端發出put請求
2 該請求被分派給合適的RegionServer和region
3 coprocessorHost攔截該請求,然後在該表上登記的每個RegionObserver上呼叫prePut()
4 如果沒有被prePut()攔截,該請求繼續送到region,然後進行處理
5 region產生的結果再次被CoprocessorHost攔截,呼叫postPut()
6 假如沒有postPut()攔截該響應,最終結果被返回給客戶端

 Observer的型別
1.RegionObs——這種Observer鉤在資料訪問和操作階段,所有標準的資料操作命令都可以被pre-hooks和post-hooks攔截
**2.WALObserver——**WAL所支援的Observer;可用的鉤子是pre-WAL和post-WAL
3.MasterObserver——鉤住DDL事件,如表建立或模式修改

 Observer應用場景示例
見下節;

 Endpoint—參考《Hbase 權威指南》

1.3.2 二級索引

row key在HBase中是以B+ tree結構化有序儲存的,所以scan起來會比較效率。單表以row key儲存索引,column value儲存id值或其他資料 ,這就是Hbase索引表的結構。

由於HBase本身沒有二級索引(Secondary Index)機制,基於索引檢索資料只能單純地依靠RowKey,為了能支援多條件查詢,開發者需要將所有可能作為查詢條件的欄位一一拼接到RowKey中,這是HBase開發中極為常見的做法

比如,現在有一張1億的使用者資訊表,建有出生地和年齡兩個索引,我想得到一個條件是在杭州出生,年齡為20歲的按使用者id正序排列前10個的使用者列表。
有一種方案是,系統先掃描出生地為杭州的索引,得到一個使用者id結果集,這個集合的規模假設是10萬。然後掃描年齡,規模是5萬,最後merge這些使用者id,去重,排序得到結果。
這明顯有問題,如何改良?
保證出生地和年齡的結果是排過序的,可以減少merge的資料量?但Hbase是按row key排序,value是不能排序的。
變通一下——將使用者id冗餘到row key裡?OK,這是一種解決方案了,這個方案的圖示如下:
這裡寫圖片描述
merge時提取交集就是所需要的列表,順序是靠索引增加了_id,以字典序保證的。

2, 按索引查詢種類建立組合索引。
在方案1的場景中,想象一下,如果單索引數量多達10個會怎麼樣?10個索引,就要merge 10次,效能可想而知。
這裡寫圖片描述
解決這個問題需要參考RDBMS的組合索引實現。
比如出生地和年齡需要同時查詢,此時如果建立一個出生地和年齡的組合索引,查詢時效率會高出merge很多。
當然,這個索引也需要冗餘使用者id,目的是讓結果自然有序。結構圖示如下:
這裡寫圖片描述
這個方案的優點是查詢速度非常快,根據查詢條件,只需要到一張表中檢索即可得到結果list。缺點是如果有多個索引,就要建立多個與查詢條件一一對應的組合索引

而索引表的維護如果交給應用客戶端,則無疑增加了應用端開發的負擔
通過協處理器可以將索引表維護的工作從應用端剝離

 利用Observer自動維護索引表示例
在社交類應用中,經常需要快速檢索各使用者的關注列表t_guanzhu,同時,又需要反向檢索各種戶的粉絲列表t_fensi,為了實現這個需求,最佳實踐是建立兩張互為反向的表:
 一個表為正向索引關注表 “t_guanzhu”:
Rowkey: A-B
f1:From
f1:To
 另一個表為反向索引粉絲表:“t_fensi”:
Rowkey: B—A
f1:From
f1:To
插入一條關注資訊時,為了減輕應用端維護反向索引表的負擔,可用Observer協處理器實現:
這裡寫圖片描述

1、編寫自定義RegionServer

public class InverIndexCoprocessor extends BaseRegionObserver {

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        // set configuration
        Configuration conf = HBaseConfiguration.create();
        // need conf.set...

        HTable table = new HTable(conf, "t_fensi");
        Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);
        Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);
        byte[] valueArray = fromCell.getValue();
        String from = new String(valueArray);
        valueArray = toCell.getValue();
        String to = new String(valueArray);

        Put putIndex = new Put((to+"-"+from).getBytes());
        putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());
        putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());

        table.put(putIndex);
        table.close();

    }
}

2、打成jar包“fensiguanzhu.jar”上傳hdfs
hadoop fs -put fensiguanzhu.jar /demo/

3、修改t_fensi的schema,註冊協處理器

hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.

4、檢查是否註冊成功
hbase(main):018:0> describe ‘ff’
DESCRIPTION ENABLED
‘ff’, {TABLE_ATTRIBUTES => {coprocessor$1 => ‘hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true
gdata.hbasecoprocessor.TestCoprocessor|1001|’}, {NAME => ‘f1’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMF
ILTER => ‘ROW’, REPLICATION_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0
‘, TTL => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, B
LOCKCACHE => ‘true’}, {NAME => ‘f2’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMFILTER => ‘ROW’, REPLICATIO
N_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’, TTL => ‘2147483647’, KE
EP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}
1 row(s) in 0.0250 seconds

5、向正向索引表中插入資料進行驗證