1. 程式人生 > >重溫大資料---Hbase架構進階

重溫大資料---Hbase架構進階

這一講主要是對Hbase JavaApi使用的介紹,程式設計還是挺簡單的,重點在於理解程式設計實現的過程。其次深入講解了Hbase的架構。以及Hbase如何實現資料的遷移。

Hbase Java API

Hbase提供了java開發的介面,可以使用java語言對Hbase資料庫進行操作。


  • jar包依賴 server client
  • 配置檔案匯入 hdfs-seite.xml core-site.xml hbase-site.xml

程式碼內容很簡單,我也打上了註釋就不多說了。

例項程式碼:

	package
com.beifeng.senior.hadoop.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.
Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.
hadoop.io.IOUtils; /** * CRUD Operations * * @author xianglei * */ public class HBaseOperation { public static HTable getHTableByTableName(String tableName) throws Exception { // 讀取預設配置檔案資訊 Configuration configuration = HBaseConfiguration.create(); //獲取表的例項 HTable table = new HTable(configuration, tableName); return table; } public void getData() throws Exception { String tableName = "user"; // 預設省略->命名框架+名稱:default.user / hbase:meta HTable table = getHTableByTableName(tableName); // 獲取一個有著Rowkey的Get 對應shell -->get 'user','10002','info:name' Get get = new Get(Bytes.toBytes("10002")); // get 查詢 指定( 列蔟 列名) get.addColumn(// Bytes.toBytes("info"), // Bytes.toBytes("name")); get.addColumn(// Bytes.toBytes("info"), // Bytes.toBytes("age")); // 獲取資料 Result result = table.get(get); // Key : rowkey + cf + c + version // Value: value // 列印 列蔟:列名->值 for (Cell cell : result.rawCells()) { System.out.println(// Bytes.toString(CellUtil.cloneFamily(cell)) + ":" // + Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" // + Bytes.toString(CellUtil.cloneValue(cell))); } // 關閉資源 table.close(); } /** * 建議 把tablename 和 columnfamily 定義為 常量 , 寫到一個常量工具類裡面 HBaseTableContent * * 對於插入的列名和值建議使用 Map<String,Obejct> 使用for進行add * * @throws Exception */ public void putData() throws Exception { String tableName = "user"; HTable table = getHTableByTableName(tableName); Put put = new Put(Bytes.toBytes("10004")); // put 'user','10004','info:name','zhaoliu' put.add(// Bytes.toBytes("info"), // Bytes.toBytes("name"), // Bytes.toBytes("zhaoliu")// ); put.add(// Bytes.toBytes("info"), // Bytes.toBytes("age"), // Bytes.toBytes(25)// ); put.add(// Bytes.toBytes("info"), // Bytes.toBytes("address"), // Bytes.toBytes("shanghai")// ); //放入資料 table.put(put); table.close(); } public void delete() throws Exception { String tableName = "user"; // default.user / hbase:meta HTable table = getHTableByTableName(tableName); Delete delete = new Delete(Bytes.toBytes("10004")); /* 刪除info列蔟下的address列的所有資料deleteColumn刪最新的deleteColumns刪除所有的 * delete.deleteColumn(Bytes.toBytes("info"),// * Bytes.toBytes("address")); */ // rowkey為10004行info列蔟所有資料 delete.deleteFamily(Bytes.toBytes("info")); table.delete(delete); table.close(); } public static void main(String[] args) throws Exception { String tableName = "user"; HTable table = null; ResultScanner resultScanner = null; try { table = getHTableByTableName(tableName); Scan scan = new Scan(); // Range 範圍掃描 包頭不包尾 scan.setStartRow(Bytes.toBytes("10001")); scan.setStopRow(Bytes.toBytes("10003")) ; // 或者這樣寫 //Scan scan2 = new Scan(Bytes.toBytes("10001"),Bytes.toBytes("10003")); // PrefixFilter // PageFilter //設定過濾器 會影響查詢速度 //scan.setFilter(filter) ; //資料讀出後快取到blockcache下一次讀就直接從快取拿 //每一次獲取多少列 //scan.setCacheBlocks(cacheBlocks); //scan.setCaching(caching); // 查詢哪些列哪些列蔟 //scan.addColumn(family, qualifier) //scan.addFamily(family) resultScanner = table.getScanner(scan); for (Result result : resultScanner) { // result裡面存的是cell System.out.println(Bytes.toString(result.getRow())); //System.out.println(result); for (Cell cell : result.rawCells()) { System.out.println(// Bytes.toString(CellUtil.cloneFamily(cell)) + ":" // + Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" // + Bytes.toString(CellUtil.cloneValue(cell))); } System.out.println("---------------------------------------"); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(resultScanner); IOUtils.closeStream(table); } } }

Hbase架構元件剖析


)

  • Client

    • 叢集訪問入口
    • 使用RPC機制與HM和HR進行通訊
    • 與HM進行通訊進行管理類操作
    • 與HR進行資料讀寫操作
    • 維護cache加快對Hbase的訪問
  • Zookeeper

    • 保證叢集只有一個HM,Hbase沒有單節點故障的概念,Hbase可以有多個HM但是隻有一個去管理
    • 儲存所有HR的定址入口 meta
    • 實時監控HS的上下線資訊,並且通知HM
    • 儲存Hbase的schema的table元資料
  • HMaster

    • Hbase沒有單節點故障的概念,Hbase可以有多個HM但是隻有一個去管理
    • 管理使用者對錶的操作
    • 管理HR的負載均衡,調整Region分佈
    • Region Split後,負責新的Region的分佈
    • HS失效後,負責失效的HS上的Region的遷移工作
  • HRegion Server

    • 維護region處理IO請求
    • 切分大Region
    • 客戶端訪問資料不需要HM參與,定址訪問ZK和HS,資料讀寫訪問HS,HM僅僅維護table和Region的元資料。

Hbase整合MapReduce


Hbase繼承所需的jar包

	/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-common-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/protobuf-java-2.5.0.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-client-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-hadoop-compat-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/hbase-protocol-0.98.6-hadoop2.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/high-scale-lib-1.1.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/zookeeper-3.4.5.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/guava-12.0.1.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/htrace-core-2.04.jar:/opt/modules/hbase-0.98.6-hadoop2/lib/netty-3.6.6.Final.jar

使用方法:

export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HBASE_HOME/lib/hbase-server-0.98.6-hadoop2.jar

 // 此jar下面自帶的
  CellCounter: Count cells in HBase table
  completebulkload: Complete a bulk data load.
  copytable: Export a table from local cluster to peer cluster
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table
  verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

TSV 格式
	tab分隔
	>> student.tsv
	1001	zhangsan	26	shanghai
CSV 格式
	逗號分隔
	>> student.csv
	1001,zhangsan,26,shanghai

completebulkload     ★ ★ ★ ★ ★ ★ 直接變成hfile
	file  csv
	  |
	hfile
	  |
	load

程式碼例項

package com.beifeng.senior.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2BasicMapReduce extends Configured implements Tool {
	
	// Mapper Class
	public static class ReadUserMapper extends TableMapper<Text, Put> {

		private Text mapOutputKey = new Text();

		@Override
		public void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
						throws IOException, InterruptedException {
			// get rowkey
			String rowkey = Bytes.toString(key.get());

			// set map的輸出key
			mapOutputKey.set(rowkey);

			// --------------------------------------------------------
			Put put = new Put(key.get());

			// iterator
			for (Cell cell : value.rawCells()) {
				// add family : info
				if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
					// add column: name
					if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
					// add column : age
					if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
				}
			}

			// context write
			context.write(mapOutputKey, put);
		}

	}

	// Reducer Class
	public static class WriteBasicReducer extends TableReducer<Text, Put, //
	ImmutableBytesWritable> {

		@Override
		public void reduce(Text key, Iterable<Put> values,
				Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
						throws IOException, InterruptedException {
			for(Put put: values){
			// 輸出已經定義好了
				context.write(null, put);
			}
		}

	}

	// Driver
	public int run(String[] args) throws Exception {
		
		// create job
		Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
		
		// set run job class
		job.setJarByClass(this.getClass());
		
		// set job
		Scan scan = new Scan();
		scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
		scan.setCacheBlocks(false);  // don't set to true for MR jobs
		// set other scan attrs

		// set input and set mapper
		TableMapReduceUtil.initTableMapperJob(
		  "user",        // input table
		  scan,               // Scan instance to control CF and attribute selection
		  ReadUserMapper.class,     // mapper class
		  Text.class,         // mapper output key
		  Put.class,  // mapper output value
		  job //
		 );
		
		// set reducer and output
		TableMapReduceUtil.initTableReducerJob(
		  "basic",        // output table
		  WriteBasicReducer.class,    // reducer class
		  job//
		 );
		
		job.setNumReduceTasks(1);   // at least one, adjust as required
		
		// submit job
		boolean isSuccess = job.waitForCompletion(true) ;
		
		
		return isSuccess ? 0 : 1;
	}
	
	
	public static void main(String[] args) throws Exception {
		// get configuration
		Configuration configuration = HBaseConfiguration.create();
		
		// submit job
		int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
		
		// exit program
		System.exit(status);
	}

	}

執行:

export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar

Hbase 資料遷移


Hbase資料來至於Logs或者RDMS等。資料遷移的方式有

  • Put API
  • bilk load tool
  • MapReduce job

準備樣本資料

		10001	zhangsan35	male	beijing	0109876543
		10002	lisi	32	male	shanghia	0109876563
		10003	zhaoliu	35	female	hangzhou	01098346543
		10004	qianqi	35	male	shenzhen	01098732543
  1. importTSV

     export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
    
     export HADOOP_HOME=/opt/modules/hadoop-2.5.0
    
     HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar \
     ${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
     -Dimporttsv.columns=HBASE_ROW_KEY,\
     info:name,info:age,info:sex,info:address,info:phone \
     student \
     hdfs://masterm:8020/user/hbase/importtsv
    
  2. bulk load方式快速載入巨量資料

1、準備資料檔案
Bulk Load的第一步。會執行一個Mapreduce作業,當中使用到了HFileOutputFormat輸出HBase資料檔案:StoreFile。
HFileOutputFormat的作用在於使得輸出的HFile檔案能夠適應單個region。使用TotalOrderPartitioner類將map輸出結果分割槽到各個不同的key區間中,每一個key區間都相應著HBase表的region。
2、匯入HBase表
第二步使用completebulkload工具將第一步的結果檔案依次交給負責檔案相應region的RegionServer,並將檔案move到region在HDFS上的儲存資料夾中。一旦完畢。將資料開放給clients。
假設在bulk load準備匯入或在準備匯入與完畢匯入的臨界點上發現region的邊界已經改變,completebulkload工具會自己主動split資料檔案到新的邊界上。可是這個過程並非最佳實踐,所以使用者在使用時須要最小化準備匯入與匯入叢集間的延時,特別是當其它client在同一時候使用其它工具向同一張表匯入資料。

bulkload的優點

  • 消除了對Hbase叢集的插入壓力
  • 提高了Job的執行速度,降低了Job的執行時間

使用方法

	export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2

	export HADOOP_HOME=/opt/modules/hadoop-2.5.0

	HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
	        ${HADOOP_HOME}/bin/yarn jar \
	${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
	-Dimporttsv.columns=HBASE_ROW_KEY,\
	info:name,info:age,info:sex,info:address,info:phone \
	-Dimporttsv.bulk.output=hdfs://master:8020/user/hbase/hfileoutput \
	student2 \
	hdfs://master:8020/user/hbase/importtsv
	
	=======================================================================
	
	export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2

	export HADOOP_HOME=/opt/modules/hadoop-2.5.0

	HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
	${HADOOP_HOME}/bin/yarn jar \
	${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar \
	completebulkload \
	hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/hbase/hfileoutput \
	student2

由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些複製資料的操作。


總結

這一塊的東西挺多的。詳見官網Hbase,我還得去看看官網的內容再來總結。