1. 程式人生 > >【HBase】HBase各功能元件、整合MapReduce的方式及資料遷移

【HBase】HBase各功能元件、整合MapReduce的方式及資料遷移

1、HBase體系架構


各個功能元件闡述如下:
(1)Client

  1. 整個HBase叢集的訪問入口;
  2. 使用HBase RPC機制與HMaster和HRegionServer進行通訊;
  3. 與HMaster進行通訊進行管理類操作;
  4. 與HRegionServer進行資料讀寫類操作;
  5. 包含訪問HBase的介面,並維護cache來加快對HBase的訪問。

(2)Zookeeper

  1. 保證任何時候,叢集中只有一個HMaster;
  2. 存貯所有HRegion的定址入口;
  3. 實時監控HRegion Server的上線和下線資訊,並實時通知給HMaster;
  4. 儲存HBase的schema和table元資料;
  5. Zookeeper Quorum儲存Meta表地址、HMaster地址。

(3)HMaster

  1. HMaster沒有單點故障問題,HBase中可以啟動多個HMaster,通過Zookeeper的Master Election機制保證總有一個Master在執行,主要負責Table和Region的管理工作。
  2. 管理使用者對table的增刪改查操作;
  3. 管理HRegionServer的負載均衡,調整Region分佈;
  4. Region Split後,負責新Region的分佈;
  5. 在HRegionServer停機後,負責失效HRegionServer上Region遷移工作。

(4)HRegion Server

  1. 維護HRegion,處理對這些HRegion的IO請求,向HDFS檔案系統中讀寫資料;
  2. 負責切分在執行過程中變得過大的HRegion;
  3. Client訪問hbase上資料的過程並不需要master參與(定址訪問Zookeeper和HRegion Server,資料讀寫訪問HRegione Server),HMaster僅僅維護者table和Region的元資料資訊,負載很低。

(5)ZooKeeper

  1. HBase 依賴ZooKeeper;
  2. 預設情況下,HBase 管理 ZooKeeper 例項,比如, 啟動或者停止ZooKeeper;
  3. HMaster與HRegionServers 啟動時會向ZooKeeper註冊;
  4. Zookeeper的引入使得HMaster不再是單點故障。

2、HBase整合MapReduce

(1)新增環境變數

export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` $HADOOP_HOME/bin/yarn jar $HADOOP_HOME/jars/hbase-mr-user2basic.jar

(2)例:HBase整合MapReduce,將user表中的部分資料匯出到basic表中。

User2BasicMapReduce.java

package com.beifeng.senior.hadoop.hbase;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2BasicMapReduce extends Configured implements Tool {
	
	// Mapper Class
	public static class ReadUserMapper extends TableMapper<Text, Put> {

		private Text mapOutputKey = new Text();

		@Override
		public void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
						throws IOException, InterruptedException {
			// get rowkey
			String rowkey = Bytes.toString(key.get());

			// set
			mapOutputKey.set(rowkey);

			// --------------------------------------------------------
			Put put = new Put(key.get());

			// iterator
			for (Cell cell : value.rawCells()) {
				// add family : info
				if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
					// add column: name
					if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
					// add column : age
					if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
						put.add(cell);
					}
				}
			}

			// context write
			context.write(mapOutputKey, put);
		}
	}

	// Reducer Class
	public static class WriteBasicReducer extends TableReducer<Text, Put, //
	ImmutableBytesWritable> {

		@Override
		public void reduce(Text key, Iterable<Put> values,
				Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
						throws IOException, InterruptedException {
			for(Put put: values){
				context.write(null, put);
			}
		}
	}

	// Driver
	public int run(String[] args) throws Exception {
		
		// create job
		Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
		
		// set run job class
		job.setJarByClass(this.getClass());
		
		// set job
		Scan scan = new Scan();
		scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
		scan.setCacheBlocks(false);  // don't set to true for MR jobs
		// set other scan attrs

		// set input and set mapper
		TableMapReduceUtil.initTableMapperJob(
		  "user",        // input table
		  scan,               // Scan instance to control CF and attribute selection
		  ReadUserMapper.class,     // mapper class
		  Text.class,         // mapper output key
		  Put.class,  // mapper output value
		  job //
		 );
		
		// set reducer and output
		TableMapReduceUtil.initTableReducerJob(
		  "basic",        // output table
		  WriteBasicReducer.class,    // reducer class
		  job//
		 );
		
		job.setNumReduceTasks(1);   // at least one, adjust as required
		
		// submit job
		boolean isSuccess = job.waitForCompletion(true) ;
		
		return isSuccess ? 0 : 1;
	}
	
	public static void main(String[] args) throws Exception {
		// get configuration
		Configuration configuration = HBaseConfiguration.create();
		
		// submit job
		int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
		
		// exit program
		System.exit(status);
	}

}

3、將資料遷移進HBase

  1. 使用HBase Put API
  1. 使用HBase bulk load tool
  2. 使用自定義的MapReduce任務

(1)HBase Bulk Load工具
通常 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put物件寫入HBase,該方式在大資料量寫入時效率低下(HBase會block寫入,頻繁進行flush,split,compact等大量IO操作),並對HBase節點的穩定性造成一定的影響(GC時間過長,響應變慢,導致節點超時退出,並引起一系列連鎖反應)。
HBase支援 bulk load 的入庫方式,它是利用hbase的資料資訊按照特定格式儲存在hdfs內這一原理,直接在HDFS中生成持久化的HFile資料格式檔案,然後上傳至合適位置,即完成巨量資料快速入庫的辦法。配合mapreduce完成,高效便捷,而且不佔用region資源,增添負載,在大資料量寫入時能極大的提高寫入效率,並降低對HBase節點的寫入壓力。
通過使用先生成HFile,然後再BulkLoad到Hbase的方式來替代之前直接呼叫HTableOutputFormat的方法有如下的好處:

  1. 消除了對HBase叢集的插入壓力;
  2. 提高了Job的執行速度,降低了Job的執行時間。

Bulk Load的工作流程:

  1. mapreduce將*.cvs檔案轉換為hfile檔案;
  2. bulk loada將hfile檔案載入進HBase表中。

執行命令如下:

export HBASE_HOME=/opt/modules/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf \
	${HADOOP_HOME}/bin/yarn jar \
	${HBASE_HOME}/lib/hbase-server-0.98.6-hadoop2.jar importtsv \
	-Dimporttsv.columns=HBASE_ROW_KEY,\
	info:name,info:age,info:sex,info:address,info:phone \
	student \
	hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/hbase/importtsv

4、HBase表的增刪改查操作示例

HBaseOperation.java

package com.beifeng.senior.hadoop.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;

/**
 * CRUD Operations
 * 
 */
public class HBaseOperation {

	public static HTable getHTableByTableName(String tableName) throws Exception {
		// Get instance of Default Configuration
		Configuration configuration = HBaseConfiguration.create();

		// Get table instance
		HTable table = new HTable(configuration, tableName);

		return table;

	}

	public void getData() throws Exception {
		String tableName = "user"; // default.user / hbase:meta

		HTable table = getHTableByTableName(tableName);

		// Create Get with rowkey
		Get get = new Get(Bytes.toBytes("10002")); // "10002".toBytes()

		// ==========================================================================
		// add column
		get.addColumn(//
				Bytes.toBytes("info"), //
				Bytes.toBytes("name"));
		get.addColumn(//
				Bytes.toBytes("info"), //
				Bytes.toBytes("age"));

		// Get Data
		Result result = table.get(get);

		// Key : rowkey + cf + c + version
		// Value: value
		for (Cell cell : result.rawCells()) {
			System.out.println(//
					Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
							+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
							+ Bytes.toString(CellUtil.cloneValue(cell)));
		}

		// Table Close
		table.close();

	}

	/**
	 * 建議 tablename & column family -> 常量 , HBaseTableContent
	 * Map<String,Obejct>
	 * @throws Exception
	 */
	public void putData() throws Exception {
		String tableName = "user"; // default.user / hbase:meta

		HTable table = getHTableByTableName(tableName);

		Put put = new Put(Bytes.toBytes("10004"));

		// Add a column with value
		put.add(//
				Bytes.toBytes("info"), //
				Bytes.toBytes("name"), //
				Bytes.toBytes("zhaoliu")//
		);
		put.add(//
				Bytes.toBytes("info"), //
				Bytes.toBytes("age"), //
				Bytes.toBytes(25)//
		);
		put.add(//
				Bytes.toBytes("info"), //
				Bytes.toBytes("address"), //
				Bytes.toBytes("shanghai")//
		);

		table.put(put);

		table.close();
	}

	public void delete() throws Exception {
		String tableName = "user"; // default.user / hbase:meta

		HTable table = getHTableByTableName(tableName);

		Delete delete = new Delete(Bytes.toBytes("10004"));
		/*
		 * delete.deleteColumn(Bytes.toBytes("info"),//
		 * Bytes.toBytes("address"));
		 */
		delete.deleteFamily(Bytes.toBytes("info"));

		table.delete(delete);

		table.close();
	}

	public static void main(String[] args) throws Exception {
		String tableName = "user"; // default.user / hbase:meta

		HTable table = null;
		ResultScanner resultScanner = null;

		try {
			table = getHTableByTableName(tableName);

			Scan scan = new Scan();
			
			// Range
			scan.setStartRow(Bytes.toBytes("10001"));
			scan.setStopRow(Bytes.toBytes("10003")) ;
			
			// Scan scan2 = new Scan(Bytes.toBytes("10001"),Bytes.toBytes("10003"));
			
			// PrefixFilter
			// PageFilter
			// scan.setFilter(filter) ;
						
			// scan.setCacheBlocks(cacheBlocks);
			// scan.setCaching(caching);
						
			// scan.addColumn(family, qualifier)
			// scan.addFamily(family)

			resultScanner = table.getScanner(scan);

			for (Result result : resultScanner) {
				System.out.println(Bytes.toString(result.getRow()));
			//	System.out.println(result);
				for (Cell cell : result.rawCells()) {
					System.out.println(//
							Bytes.toString(CellUtil.cloneFamily(cell)) + ":" //
									+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " ->" //
									+ Bytes.toString(CellUtil.cloneValue(cell)));
				}
				System.out.println("---------------------------------------");
			}

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(resultScanner);
			IOUtils.closeStream(table);
		}
	}
}