HBase建表高階屬性,hbase應用案例看行鍵設計,HBase和mapreduce結合,從Hbase中讀取資料、分析,寫入hdfs,從hdfs中讀取資料寫入Hbase,協處理器和二級索引
1. Hbase高階應用
1.1建表高階屬性
下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性
1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式
布隆過濾可以每列族單獨啟用。
使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 對列族單獨啟用布隆。
Default = ROW 對行進行布隆過濾。
對 ROW,行鍵的雜湊在每次插入行時將被新增到布隆。
對 ROWCOL,行鍵 + 列族 + 列族修飾的雜湊將在每次插入行時新增到布隆
使用方法: create ‘table’,{BLOOMFILTER =>’ROW’}
啟用布隆過濾可以節省讀磁碟過程,可以有助於降低讀取延遲
2、 VERSIONS 預設是1 這個引數的意思是資料保留1個 版本,如果我們認為我們的資料沒有這麼大的必要保留這麼多,隨時都在更新,而老版本的資料對我們毫無價值,那將此引數設為1 能節約2/3的空間
使用方法: create ‘table’,{VERSIONS=>’2’}
附:MIN_VERSIONS => ‘0’是說在compact操作執行之後,至少要保留的版本
3、 COMPRESSION 預設值是NONE 即不使用壓縮
這個引數意思是該列族是否採用壓縮,採用什麼壓縮演算法
使用方法: create ‘table’,{NAME=>’info’,COMPRESSION=>’SNAPPY’}
建議採用SNAPPY壓縮演算法
HBase中,在Snappy釋出之前(Google 2011年對外發布Snappy),採用的LZO演算法,目標是達到儘可能快的壓縮和解壓速度,同時減少對CPU的消耗;
在Snappy釋出之後,建議採用Snappy演算法(參考《HBase: The Definitive Guide》),具體可以根據實際情況對LZO和Snappy做過更詳細的對比測試後再做選擇。
Algorithm | % remaining | Encoding | Decoding |
---|---|---|---|
GZIP | 13.4% | 21 MB/s | 118 MB/s |
LZO | 20.5% | 135 MB/s | 410 MB/s |
Zippy/Snappy | 22.2% | 172 MB/s | 409 MB/s |
如果建表之初沒有壓縮,後來想要加入壓縮演算法,可以通過alter修改schema
4. alter
使用方法:
如 修改壓縮演算法
disable ‘table’
alter ‘table’,{NAME=>’info’,COMPRESSION=>’snappy’}
enable ‘table’
但是需要執行major_compact ‘table’ 命令之後 才會做實際的操作。
5. TTL
預設是 2147483647 即:Integer.MAX_VALUE 值大概是68年
這個引數是說明該列族資料的存活時間,單位是s
這個引數可以根據具體的需求對資料設定存活時間,超過存過時間的資料將在表中不在顯示,待下次major compact的時候再徹底刪除資料
注意的是TTL設定之後 MIN_VERSIONS=>’0’ 這樣設定之後,TTL時間戳過期後,將全部徹底刪除該family下所有的資料,如果MIN_VERSIONS 不等於0那將保留最新的MIN_VERSIONS個版本的資料,其它的全部刪除,比如MIN_VERSIONS=>’1’ 屆時將保留一個最新版本的資料,其它版本的資料將不再儲存。
6. describe ‘table’ 這個命令查看了create table 的各項引數或者是預設值。
7. disable_all ‘toplist.*’ disable_all 支援正則表示式,並列出當前匹配的表的如下:
toplist_a_total_1001
toplist_a_total_1002
toplist_a_total_1008
toplist_a_total_1009
toplist_a_total_1019
toplist_a_total_1035
…
Disable the above 25 tables (y/n)? 並給出確認提示
8. drop_all 這個命令和disable_all的使用方式是一樣的
9. hbase 表預分割槽—-手動分割槽
預設情況下,在建立HBase表的時候會自動建立一個region分割槽,當匯入資料的時候,所有的HBase客戶端都向這一個region寫資料,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先建立一些空的regions,這樣當資料寫入HBase時,會按照region分割槽情況,在叢集內做資料的負載均衡。
命令方式:
create ‘t1’, ‘f1’, {NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’}
也可以使用api的方式:
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info
引數:
test_table是表名
HexStringSplit 是split 方式
-c 是分10個region
-f 是family
可在UI上檢視結果,如圖:
這樣就可以將表預先分為15個區,減少資料達到storefile 大小的時候自動分割槽的時間消耗,並且還有以一個優勢,就是合理設計rowkey 能讓各個region 的併發請求平均分配(趨於均勻) 使IO 效率達到最高,但是預分割槽需要將filesize 設定一個較大的值,設定哪個引數呢 hbase.hregion.max.filesize 這個值預設是10G 也就是說單個region 預設大小是10G
這個引數的預設值在0.90 到0.92到0.94.3各版本的變化:256M–1G–10G
但是如果MapReduce Input型別為TableInputFormat 使用hbase作為輸入的時候,就要注意了,每個region一個map,如果資料小於10G 那隻會啟用一個map 造成很大的資源浪費,這時候可以考慮適當調小該引數的值,或者採用預分配region的方式,並將檢測如果達到這個值,再手動分配region。
1.2 hbase應用案例看行鍵設計
表結構設計
1、列族數量的設定
以使用者資訊為例,可以將必須的基本資訊存放在一個列族,而一些附加的額外資訊可以放在另一列族;
2、行鍵的設計
語音詳單:
13877889988-20150625
13877889988-20150625
13877889988-20150626
13877889988-20150626
13877889989
13877889989
13877889989
—-將需要批量查詢的資料儘可能連續存放
CMS系統—-多條件查詢
儘可能將查詢條件關鍵詞拼裝到rowkey中,查詢頻率最高的條件儘量往前靠
20150230-zhangsan-category…
20150230-lisi-category…
(每一個條件的值長度不同,可以通過做定長對映來提高效率)
參考:《hbase 實戰》—-詳細講述了facebook /GIS等系統的表結構設計
1.3 Hbase和mapreduce結合
為什麼需要用mapreduce去訪問hbase的資料?
——加快分析速度和擴充套件分析能力
Mapreduce訪問hbase資料作分析一定是在離線分析的場景下應用
1.3.1 從Hbase中讀取資料、分析,寫入hdfs
/**
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseReader {
public static String flow_fields_import = "flow_fields_import";
static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.copyBytes();
String phone = new String(bytes);
byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());
String url = new String(urlbytes);
context.write(new Text(phone + "\t" + url), NullWritable.get());
}
}
static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
1.3.2 從hdfs中讀取資料寫入Hbase
/**
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseSinker {
public static String flow_fields_import = "flow_fields_import";
static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
String url = fields[1];
FlowBean bean = new FlowBean(phone,url);
context.write(bean, NullWritable.get());
}
}
static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());
context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
if(tableExists){
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
}
1.3 hbase高階程式設計
1.3.1 協處理器—- Coprocessor
協處理器有兩種:observer和endpoint
Observer允許叢集在正常的客戶端操作過程中可以有不同的行為表現
Endpoint允許擴充套件叢集的能力,對客戶端應用開放新的運算命令
Observer協處理器
正常put請求的流程:
加入Observer協處理後的put流程:
1 客戶端發出put請求
2 該請求被分派給合適的RegionServer和region
3 coprocessorHost攔截該請求,然後在該表上登記的每個RegionObserver上呼叫prePut()
4 如果沒有被prePut()攔截,該請求繼續送到region,然後進行處理
5 region產生的結果再次被CoprocessorHost攔截,呼叫postPut()
6 假如沒有postPut()攔截該響應,最終結果被返回給客戶端
Observer的型別
1.RegionObs——這種Observer鉤在資料訪問和操作階段,所有標準的資料操作命令都可以被pre-hooks和post-hooks攔截
**2.WALObserver——**WAL所支援的Observer;可用的鉤子是pre-WAL和post-WAL
3.MasterObserver——鉤住DDL事件,如表建立或模式修改
Observer應用場景示例
見下節;
Endpoint—參考《Hbase 權威指南》
1.3.2 二級索引
row key在HBase中是以B+ tree結構化有序儲存的,所以scan起來會比較效率。單表以row key儲存索引,column value儲存id值或其他資料 ,這就是Hbase索引表的結構。
由於HBase本身沒有二級索引(Secondary Index)機制,基於索引檢索資料只能單純地依靠RowKey,為了能支援多條件查詢,開發者需要將所有可能作為查詢條件的欄位一一拼接到RowKey中,這是HBase開發中極為常見的做法
比如,現在有一張1億的使用者資訊表,建有出生地和年齡兩個索引,我想得到一個條件是在杭州出生,年齡為20歲的按使用者id正序排列前10個的使用者列表。
有一種方案是,系統先掃描出生地為杭州的索引,得到一個使用者id結果集,這個集合的規模假設是10萬。然後掃描年齡,規模是5萬,最後merge這些使用者id,去重,排序得到結果。
這明顯有問題,如何改良?
保證出生地和年齡的結果是排過序的,可以減少merge的資料量?但Hbase是按row key排序,value是不能排序的。
變通一下——將使用者id冗餘到row key裡?OK,這是一種解決方案了,這個方案的圖示如下:
merge時提取交集就是所需要的列表,順序是靠索引增加了_id,以字典序保證的。
2, 按索引查詢種類建立組合索引。
在方案1的場景中,想象一下,如果單索引數量多達10個會怎麼樣?10個索引,就要merge 10次,效能可想而知。
解決這個問題需要參考RDBMS的組合索引實現。
比如出生地和年齡需要同時查詢,此時如果建立一個出生地和年齡的組合索引,查詢時效率會高出merge很多。
當然,這個索引也需要冗餘使用者id,目的是讓結果自然有序。結構圖示如下:
這個方案的優點是查詢速度非常快,根據查詢條件,只需要到一張表中檢索即可得到結果list。缺點是如果有多個索引,就要建立多個與查詢條件一一對應的組合索引
而索引表的維護如果交給應用客戶端,則無疑增加了應用端開發的負擔
通過協處理器可以將索引表維護的工作從應用端剝離
利用Observer自動維護索引表示例
在社交類應用中,經常需要快速檢索各使用者的關注列表t_guanzhu,同時,又需要反向檢索各種戶的粉絲列表t_fensi,為了實現這個需求,最佳實踐是建立兩張互為反向的表:
一個表為正向索引關注表 “t_guanzhu”:
Rowkey: A-B
f1:From
f1:To
另一個表為反向索引粉絲表:“t_fensi”:
Rowkey: B—A
f1:From
f1:To
插入一條關注資訊時,為了減輕應用端維護反向索引表的負擔,可用Observer協處理器實現:
1、編寫自定義RegionServer
public class InverIndexCoprocessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
// set configuration
Configuration conf = HBaseConfiguration.create();
// need conf.set...
HTable table = new HTable(conf, "t_fensi");
Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);
Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);
byte[] valueArray = fromCell.getValue();
String from = new String(valueArray);
valueArray = toCell.getValue();
String to = new String(valueArray);
Put putIndex = new Put((to+"-"+from).getBytes());
putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());
putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());
table.put(putIndex);
table.close();
}
}
2、打成jar包“fensiguanzhu.jar”上傳hdfs
hadoop fs -put fensiguanzhu.jar /demo/
3、修改t_fensi的schema,註冊協處理器
hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
4、檢查是否註冊成功
hbase(main):018:0> describe ‘ff’
DESCRIPTION ENABLED
‘ff’, {TABLE_ATTRIBUTES => {coprocessor$1 => ‘hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true
gdata.hbasecoprocessor.TestCoprocessor|1001|’}, {NAME => ‘f1’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMF
ILTER => ‘ROW’, REPLICATION_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0
‘, TTL => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, B
LOCKCACHE => ‘true’}, {NAME => ‘f2’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMFILTER => ‘ROW’, REPLICATIO
N_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’, TTL => ‘2147483647’, KE
EP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}
1 row(s) in 0.0250 seconds
5、向正向索引表中插入資料進行驗證