1. 程式人生 > >hbase的java API使用詳解

hbase的java API使用詳解

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

/**
HBase當中的核心API :13個

	HBaseConfiguration   配置資訊
	Connection
	HBaseAdmin/Admin     管理員
	
	HTable    整個表的抽象資訊
	HColumnDescriptor  列的描述資訊
	HTableDescriptor   列簇的描述資訊

	Put
	Delete
	Get
	Scan

	KeyValue
	Cell

	Result
	ResultScanner

 */
public class HBase_First {

	public static void main(String[] args) throws Exception {
		
		/**
		 * 第一步: 獲取連線
		 */
		// 建立一個可以用來管理hbase配置資訊的config物件
		Configuration config = HBaseConfiguration.create();
		// 設定當前的程式去尋找的hbase在哪裡
		config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181,hadoop04:2181");
		//建立和建立連線
		Connection con = ConnectionFactory.createConnection(config);
		// 根據連接獲取到一個管理員物件
		Admin admin = con.getAdmin();
		
		/**
		 * 第二步:通過連線進行資料庫的響應操作
		 */
		//檢查表是否存在,存在返回true
		boolean tableExists = admin.tableExists(TableName.valueOf("user_info_1"));
		System.out.println(tableExists);
		/**
		 * 第三步:關閉連線
		 */
		admin.close();
		con.close();
	}
}
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;


public class HBase_API {
	
	private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
	private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop04:2181";

	public static void main(String[] args) {
		
	}
	
	private static Connection con = null;
	
	private static Admin admin = null;
	
	private static Table table = null;
	
	@Before
	public void init(){
		// 建立一個可以用來管理hbase配置資訊的config物件
		Configuration config = HBaseConfiguration.create();
		// 設定當前的程式去尋找的hbase在哪裡
		config.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
		try {
			//建立和建立連線 
			con = ConnectionFactory.createConnection(config);
			// 根據連接獲取到一個管理員物件(做一個管理的操作如建立、刪除、查詢表用admin,針對表做具體的資料操作就通過con來獲取表的物件)
			admin  = con.getAdmin();
			//獲取專門用來進行資料處理的、代表某張表的一個 表物件table,用來操作user_info這個表
			table = con.getTable(TableName.valueOf("user_info"));
			
		} catch (Exception e) {
			System.out.println("獲取hbase連線失敗");
		}
	}
	/**
	 * 建立表的方法
	 */
	@Test
	public void createTable() throws Exception{
		//建立一個表名物件tn,表名為my_stu
		TableName tn = TableName.valueOf("my_stu");
		//建立一個表的描述物件HTableDescriptor(包含表名和列簇名),並且它的表名為tn物件的值--my_stu
		HTableDescriptor htd = new HTableDescriptor(tn);
		//建立一個列簇為"cf1"的列簇物件cf1,注意在建立表時候必須寫列簇
		HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
		//新增一個列簇物件cf1
		htd.addFamily(cf1);
		//建立一個表
		admin.createTable(htd);
		//如果有tn名字的表,那麼建立成功
		if(admin.tableExists(tn)){
			System.out.println("建立表成功");
		}else{
			System.out.println("建立表失敗");
		}
	}
	/**
	 * 輸出所有的表的列簇名字
	 */
	@Test
	public void listTables() throws Exception{
		
		//HTableDescriptor的物件包含表名和列簇名,獲取資料庫中的所有表的資訊
		HTableDescriptor[] listTables = admin.listTables();
		
		for(HTableDescriptor htd:  listTables){
			//輸出所有的表名
			System.out.print(htd.getTableName() + "\t");
			//獲取所有的列簇的一個集合
			HColumnDescriptor[] columnFamilies = htd.getColumnFamilies();
			for(HColumnDescriptor hcd : columnFamilies){
				//獲取當前列簇的名字getName()返回的是位元組型別的
				String name = Bytes.toString(hcd.getName());
//				String name = new String(hcd.getName())
				System.out.print(name + " ");
			}
			
			System.out.println();
		}
	}
	/**
	 * 刪除表
	 */
	@Test
	public void dropTable() throws Exception{
		//表名物件
		TableName tn = TableName.valueOf("my_stu");
		//檢查表是否有效,也就是表是否啟用,啟用返回true,停用返回false
		boolean tableEnabled = admin.isTableEnabled(tn);
		if(tableEnabled){//先判斷表的狀態比較穩妥,否則停用狀態再停用可能出問題
			//把tn錶停用
			admin.disableTable(tn);
		}
		//刪除my_stu表(先停用表再刪除表)
		admin.deleteTable(tn);
		//判斷tn表在不在資料庫裡面,如果在返回true
		if(admin.tableExists(tn)){
			System.out.println("刪除表失敗");
		}else{
			System.out.println("刪除表成功");
		}
	}
	
	/**
	 * 表示往某張表中進行資料的插入
	 * 
	 * 專門用來進行資料處理的、代表某張表的一個 表物件,就是 HTable 類的一個例項物件----table
	 */
	//向某張表中插入一條資料
	@Test
	public void putData() throws Exception{
		//建立一個put物件,因為要插入資料所以必須指定row key,現在僅僅是指定行健,沒有對應的keyvalue值
		Put put = new Put("rk01".getBytes());
		//指定列簇、列、值,現在要插入的一行資料就準備好了
		put.addColumn("base_info".getBytes(), "xx".getBytes(), "yy".getBytes());
		//往表裡插入資料,傳一個put物件插入一行記錄,穿多個put物件插入多行記錄
		table.put(put);
	}
	
	//向某張表中插入多條記錄
	@Test
	public void putDatas() throws Exception{
		Put put1 = new Put("rk03".getBytes());
		put1.addColumn("base_info".getBytes(), "xxx".getBytes(), "yy".getBytes());
		Put put2 = new Put("rk02".getBytes());
		put2.addColumn("base_info".getBytes(), "xxx".getBytes(), "yy".getBytes());
		
		List<Put> puts = new ArrayList();
		puts.add(put1);
		puts.add(put2);
		//.put()方法可以傳list的一堆Put物件,插入就用put()方法
		table.put(puts);
	}
	
	//刪除資料
	@Test
	public void deleteData() throws Exception{
		//在增刪改查row key都需要指定的,如果不寫addColumn()那麼就是刪除rk這一行的資料
		Delete delete = new Delete("rk02".getBytes());
		//刪除rk02這條記錄下的base_info列簇的xxx key
		delete.addColumn("base_info".getBytes(), "xxx".getBytes());
		//刪除就用delete()方法
		table.delete(delete);
	}
	
	//get查詢某個表中的資料
	@Test
	public void getData() throws Exception{
		//行鍵row key,如果不下addFamily()方法就是查詢一行的所有列簇下的所有cell的資訊
		Get get = new Get("baiyc_20150716_0005".getBytes());
		//新增一個列簇物件(用來查詢指定列簇下的cell資訊)
		get.addFamily("base_info".getBytes());
		//返回一個Result物件
		Result result = table.get(get);
		
		//返回所有查詢到的單元格cell
		List<Cell> cells = result.listCells();
		//迴圈拿cell
		for(Cell c : cells){
			//直接列印是看不懂的,cells本身就是個位元組陣列
			System.out.println(c.toString());
			//獲取到列簇資訊 
			String family = Bytes.toString(c.getFamily());
			//獲取到列key資訊
			String qualifier = Bytes.toString(c.getQualifier());
			//獲取到value值
			String value = Bytes.toString(c.getValue());
			//獲取時間戳
			long ts = c.getTimestamp();
			System.out.println(family + "\t" + qualifier + "\t" + value + "\t" + ts);
		}
	}
	
	//Scan查詢某個表中的資料
	@Test
	public void getResultScanner() throws Exception{
		//無參方法就是全部,不寫下面的限制範圍的方法就是全表掃描
		Scan scan  = new Scan();
		//查詢某些特定的列簇
//		scan.addFamily("base_info".getBytes());
		//查詢base_info列簇和name列的資訊
		scan.addColumn("base_info".getBytes(), "name".getBytes());
		//從哪個行鍵開始掃描
		scan.setStartRow("rk01".getBytes());
		//從哪個行鍵結束掃描
		scan.setStopRow("zhangsan_20150701_0004".getBytes());
		//上面的四條限制範圍都可以單獨或同時使用,且使用越多越嚴格
		
		ResultScanner scanner = table.getScanner(scan);
		//解析ResultScanner物件的資訊並且列印輸出,跟上面的很像
		for (Result result : scanner) {
			List<Cell> cells = result.listCells();
			for (int i = 0; i < cells.size(); i++) {
				Cell cell = cells.get(i);
				System.out.println(Bytes.toString(cell.getRow()) + "\t" + Bytes.toString(cell.getFamily()) + "\t" + Bytes.toString(cell.getQualifier())
				+ "\t" + Bytes.toString(cell.getValue()) + "\t" + cell.getTimestamp());
			}
		}
	}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;

import com.ghgj.hbase.util.HBasePrintUtil;

public class FliterTest {
	
	private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
	private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop04:2181";


	public static void main(String[] args) throws Exception {
		
		
		Configuration conf = HBaseConfiguration.create();
		conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
		Connection con = ConnectionFactory.createConnection(conf);
		Admin admin = con.getAdmin();
		HTable table = (HTable) con.getTable(TableName.valueOf("user_info"));
		Scan scan = new Scan();
		/**
		 * 在這兒給scan新增過濾器
		 * 
		 * 構造過濾器的時候,要注意這兩個引數的意義:
		 * 
		 * 第一個引數: 比較規則   
		 * 第二個引數: 比較器
		 */
		Filter filter1  = new RowFilter(CompareOp.GREATER, new BinaryComparator("rk03".getBytes()));
		Filter filter2  = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator("base_info".getBytes()));
		
		/**
		 * 分頁過濾的構造引數就是 : 每一頁的總資料條數
		 * Filter pageFilter = new PageFilter(3,4);   ----  id : 9-12 
		 * 真正的分頁實現就只需要兩個引數:      pageIndex   pageSize
		 * 
		 * 	pageIndex: 第幾頁
		 *  
		 *  pageSzie :每頁大小
		 */
		Filter pageFilter = new PageFilter(3);
		Filter filter = new FilterList(filter1, filter2, pageFilter);
		scan.setFilter(filter);
		ResultScanner scanner = table.getScanner(scan);
		HBasePrintUtil.printResultScanner(scanner);
		admin.close();
		con.close();
	}
}
package com.ghgj.hbase.page;

import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;

import com.ghgj.hbase.util.HBasePrintUtil;
import com.ghgj.hbase.util.HBaseUtil;

public class HBase_Page {
	
	public static void main(String[] args) throws Exception {
		
		
//		ResultScanner pageData = getPageData(3, 10);
//		ResultScanner pageData = getPageData(2, 2);
		
		
//		ResultScanner pageData1 = getPageData(1, 3);
//		ResultScanner pageData1 = getPageData("baiyc_20150716_0002", 3);
		
		
//		ResultScanner pageData2 = getPageData("rk01", 3);
		ResultScanner pageData2 = getPageData(3, 2000);
		
		HBasePrintUtil.printResultScanner(pageData2);
	}

	/**
	 * @param pageIndex 	第幾頁
	 * @param pageSize		每一頁的記錄總數
	 * @return
	 * 
	 * 負責編寫JS程式碼的前端人員。他們是不知道。怎麼傳入startRow 
	 */
	public static ResultScanner getPageData(int pageIndex, int pageSize) throws Exception{
		
		if(pageSize < 3 || pageSize > 15){
			pageSize = 5;
		}
		
		/**
		 * 當前這個程式碼的真實作用就是把:;
		 * 
		 * "baiyc_20150716_0001", 3
		 * 
		 * 轉換成:
		 * 
		 * 2, 3
		 * 
		 * 難點: 就是  pageIndex 轉成 startRow
		 */
		String startRow = getCurrentPageStartRow(pageIndex, pageSize);
		
		return getPageData(startRow, pageSize);
		
	}
	
	
	/**
	 * 當前這個方法的作用:
	 * 
	 * 	就是把   前端人員 穿送過來的    pageIndex  轉換成  startRow
	 * 
	 *  以方便呼叫底層最簡單的獲取一頁分頁資料的 方法: getPageData(startRow, pageSize)
	 * 
	 * @param pageIndex
	 * @param pageSize
	 * @return
	 */
	private static String getCurrentPageStartRow(int pageIndex, int pageSize) throws Exception {
		
		// 怎麼實現?
		
		
		// 如果 傳送過來的額  pageIndex 不合法。 預設返回 第一頁資料
		if(pageIndex <= 1){
			
			/*pageIndex == -1
					轉成了
			startRow == null*/
			
			return null;
		
		}else{
			
			// 從第二頁開始的所有資料。
			String startRow = null;
			
			
			for(int i = 1; i <= pageIndex - 1;  i++){
				
				// 第幾次迴圈,就是獲取第幾頁的資料
				ResultScanner pageData = getPageData(startRow, pageSize);
				
				// 獲取當前這一頁的最後rowkey
				Iterator<Result> iterator = pageData.iterator();
				Result result = null;
				while(iterator.hasNext()){
					result = iterator.next();
				}
				
				// 讓最後一個rowkey往後挪動一點位置,但是又不會等於下一頁的 startRow
				String endRowStr = new String(result.getRow());
				byte[] add = Bytes.add(endRowStr.getBytes(), new byte[]{ 0x00});
				String nextPageStartRowStr = Bytes.toString(add);
				
				// 
				startRow = nextPageStartRowStr;
			}
			
			return startRow;
		}
		
	}

	/**
	 * 描述:
	 * 
	 * 		從  startRow開始 查詢 pageSize 條資料
	 * 
	 * @param startRow
	 * @param pageSize
	 * @return
	 */
	public static ResultScanner getPageData(String startRow, int pageSize) throws IOException{
		
		
		Connection con = HBaseUtil.getConnection();
//		Admin admin = HBaseUtil.getAdmin();
		
		Table table = HBaseUtil.getTable("user_info");
		
		
		Scan scan = new Scan();
		
		// 設定起始行健搞定
		
		// 如果是第一頁資料, 所以  scan.setStartRow這句程式碼根本就沒有任何意義。。 不用設定即可
		if(!StringUtils.isBlank(startRow)){
			
			// 如果使用者不傳入 startRow, 或者傳入了一個 非法的 startRow, 還是按照規則  返回   第一頁資料
			scan.setStartRow(startRow.getBytes());
		}
		
		
		// 設定總資料條件
		Filter pageFilter = new PageFilter(pageSize);
		
		scan.setFilter(pageFilter);
		
		ResultScanner scanner = table.getScanner(scan);
		
		
		return scanner;
	}
}
package com.ghgj.hbase.mr;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 	get 'stduent','rk01'   ====  Result
 * 
 *  需求:讀出所有的記錄(Result),然後提取出對應的 age 資訊
 *  
 *  mapper階段的  
 *  
 *  	輸入: 從hbase來
 *  
 *  		key :  rowkey     
 *      	value :  result
 *      
 *      	ImmutableBytesWritable, Result
 *  
 *      輸出: hdfs
 *      
 *      	key :  age
 *          value :    年齡值
 *          
 *  reducer階段:
 *  
 *  	輸入:
 *  
 *      	key :  "age"
 *          value :    年齡值 = 18
 *          
 *     輸出:
 *     
 *     		key: NullWritbale
 *     		value: 平均
 */
public class ReadDataFromHBaseToHDFSMR extends Configured implements Tool {

	public static void main(String[] args) throws Exception {

		int run = ToolRunner.run(new ReadDataFromHBaseToHDFSMR(), args);

		System.exit(run);
	}

	@Override
	public int run(String[] arg0) throws Exception {

		Configuration config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181");
		config.set("fs.defaultFS", "hdfs://myha01/");
		config.addResource("config/core-site.xml");
		config.addResource("config/hdfs-site.xml");
		System.setProperty("HADOOP_USER_NAME", "hadoop");

		Job job = Job.getInstance(config, "ReadDataFromHBaseToHDFSMR");
		job.setJarByClass(ReadDataFromHBaseToHDFSMR.class);

		
		
		// 從此開始,就是設定當前這個MR程式的各種job細節
		Scan scan  = new Scan();
		scan.addColumn("info".getBytes(), "age".getBytes());
		TableMapReduceUtil.initTableMapperJob(
				"student".getBytes(),	// 指定表名
				scan,   	// 指定掃描資料的條件
				ReadDataFromHBaseToHDFSMR_Mapper.class, 	// 指定mapper class
				Text.class,    	// outputKeyClass mapper階段的輸出的key的型別
				IntWritable.class, // outputValueClass mapper階段的輸出的value的型別
				job,
				false);		// job物件
		
		
		job.setReducerClass(ReadDataFromHBaseToHDFSMR_Reducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);
		
		
		/**
		 * 在當前的MR程式中。   輸入的資料是來自於 HBase,  按照常理來說,需要自定義一個數據讀取元件   讀 hbase
		 * 
		 * 但是:TableMapReduceUtil.initTableMapperJob 這個方法已經做了。!!!!!!
		 */
		
		FileOutputFormat.setOutputPath(job, new Path("/student/avgage_output2"));
		

		boolean isDone = job.waitForCompletion(true);
		return isDone ? 0 : 1;
	}
	
	
	public static class ReadDataFromHBaseToHDFSMR_Mapper extends TableMapper<Text, IntWritable>{
		
		Text outKey = new Text("age");
		
		/**
		 * key = 就是rowkey
		 * 
		 * value = 就是一個result物件
		 */
		@Override
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
			
			boolean containsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
			
			if(containsColumn){
				
				List<Cell> cells = value.getColumnCells("info".getBytes(), "age".getBytes());
				
				Cell cell = cells.get(0);
				
				byte[] cloneValue = CellUtil.cloneValue(cell);
				String age = Bytes.toString(cloneValue);
				
				context.write(outKey, new IntWritable(Integer.parseInt(age)));
			}
		}
	}
	
	public static class ReadDataFromHBaseToHDFSMR_Reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			
			
			int count = 0;
			int sum = 0;
			
			for(IntWritable iw : values){
				
				count++;
				sum += iw.get();
			}
			
			double  avgAge = sum * 1D / count;
			
			context.write(key, new DoubleWritable(avgAge));
		}
	}
}
package com.ghgj.hbase.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce1111.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.runner.Result;

/**
 * 需求:讀取HDFS上的資料。插入到HBase庫中
 * 
 * hbase.zookeeper.quorum == hadoop02:2181
 */
public class ReadHDFSDataToHBaseMR extends Configured implements Tool{

	@Override
	public int run(String[] arg0) throws Exception {
		
//		Configuration conf = new Configuration();
//		conf.set("fs.defaultFS", "hdfs://myha01/");
//		conf.addResource("config/core-site.xml");
//		conf.addResource("config/hdfs-site.xml");
		
		// config === HBaseConfiguration
		Configuration config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "hadoop02:2181,hadoop03:2181");
		config.set("fs.defaultFS", "hdfs://myha01/");
		config.addResource("config/core-site.xml");
		config.addResource("config/hdfs-site.xml");
		
		
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		Job job = Job.getInstance(config, "ReadHDFSDataToHBaseMR");
		job.setJarByClass(ReadHDFSDataToHBaseMR.class);
		
		job.setMapperClass(HBaseMR_Mapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		// 設定資料讀取元件
		job.setInputFormatClass(TextInputFormat.class);
		// 設定資料的輸出元件
//		job.setOutputFormatClass(cls);
//		TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job);
		TableMapReduceUtil.initTableReducerJob("student", HBaseMR_Reducer.class, job, null, null, null, null, false);
		
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Put.class);
		
//		FileInputFormat.addInputPath(job, new Path("E:\\bigdata\\hbase\\student\\input"));
		FileInputFormat.addInputPath(job, new Path("/student/input/"));
		
		boolean isDone = job.waitForCompletion(true);
		
		return isDone ? 0: 1;
	}

	public static void main(String[] args) throws Exception {
		
		int run = ToolRunner.run(new ReadHDFSDataToHBaseMR(), args);
		
		System.exit(run);
	}

	
	
	public static class HBaseMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
		
		/**
		 * 每次讀取一行資料
		 * 
		 * Put  : 構造一個put物件的時候,需要
		 * put 'stduent','95001','cf:name','liyong'
		 * 
		 * 
		 * name:huangbo
		 * age:18
		 * 
		 * name:xuzheng
		 * 
		 */
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {

			context.write(value, NullWritable.get());
			
		}
	}
	
	
	public static class HBaseMR_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
		
		/**
		 * key  ===  95011,包小柏,男,18,MA
		 * 
		 * 95001:  rowkey
		 * 包小柏 : name
		 * 18 : age
		 * 男  : sex
		 * MA : department
		 * 
		 * column family :  cf
		 */
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
			
			String[] split = key.toString().split(",");
			
			Put put = new Put(split[0].getBytes());
			
			put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
			put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
			put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
			put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());
			
			context.write(NullWritable.get(), put);
		}
	}
}