1. 程式人生 > >實戰:在Java Web專案中使用HBase

實戰:在Java Web專案中使用HBase

在此之前我們使用MySQL作為資料來源,但發現這資料增長速度太快,並且由於種種原因,因此必須使用HBase,所以我們要把Mysql表裡面的資料遷移到HBase中,在這裡我就不講解、不爭論為什麼要使用HBase,HBase是什麼了,喜歡的就認真看下去,總有些地方是有用的。

我們要做的3大步驟:

  1. 新建HBase表格。

  2. 把MYSQL資料遷移到HBase中。

  3. Java Web專案中讀取HBase的資料。

先介紹一下必要的一些環境:HBase的版本:0.98.8-hadoop2

所需的依賴包

[html] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. commons-codec-1.7.jar  
  2. commons-collections-3.2.1.jar  
  3. commons-configuration-1.6.jar  
  4. commons-lang-2.6.jar  
  5. commons-logging-1.1.3.jar  
  6. guava-12.0.1.jar  
  7. hadoop-auth-2.5.0.jar  
  8. hadoop-common-2.5.0.jar  
  9. hbase-client-0.98.8-hadoop2.jar  
  10. hbase-common-0.98.8-hadoop2.jar  
  11. hbase-protocol-0.98.8-hadoop2.jar  
  12. htrace-core-2.04.jar  
  13. jackson-core-asl-1.9.13.jar  
  14. jackson-mapper-asl-1.9.13.jar  
  15. log4j-1.2.17.jar  
  16. mysql-connector-java-5.1.7-bin.jar  
  17. netty-3.6.6.Final.jar  
  18. protobuf-java-2.5.0.jar  
  19. slf4j-api-1.7.5.jar  
  20. slf4j-log4j12-1.7.5.jar  
  21. zookeeper-3.4.6.jar  
commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.3.jar
guava-12.0.1.jar
hadoop-auth-2.5.0.jar
hadoop-common-2.5.0.jar
hbase-client-0.98.8-hadoop2.jar
hbase-common-0.98.8-hadoop2.jar
hbase-protocol-0.98.8-hadoop2.jar
htrace-core-2.04.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.7-bin.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
zookeeper-3.4.6.jar

如果在你的web專案中有些包已經存在,保留其中一個就好了,免得報奇怪的錯誤就麻煩了。

步驟1:建表

在此之前,我在Mysql中的業務資料表一共有6個,其結構重複性太高了,首先看看我在HBase裡面的表結構:

表名 kpi
key fid+tid+date
簇(family) base gpower userate consum time
描述 基礎資訊 發電量相關指標 可利用率 自耗電量 累計執行小時數 檢修小時數 利用小時數
列(qualifier) fid tid date power windspeed unpower theory coup time power num cpower gpower runtime checktime usetime
描述 風場ID 風機號 日期 發電量 風速 棄風電量 理論電量 耦合度 故障時間 故障損失電量 故障臺次 當天自耗電量 當天發電量 當天併網秒數 當天檢修秒數 當天利用秒數

這個表中我們有5個family,其中base Family是對應6個mysql表中的key列, gpower、userate、consum分別對應一個表,time對應3個表。

這個kpi表的rowkey設計是base中的3個qualifier,分別從3個維度查詢資料,這樣的設計已經可以滿足我們的需求了。

具體在HBase中如何建表如何搭建環境自己參考我寫的【手把手教你配置HBase完全分散式環境】這篇文章吧。

步驟2:把MySQL資料遷移到HBase

這時我用gpower對應的mysql表來做演示吧,其他表的道理都一樣。(這裡可能有人會說為什麼不用第三方外掛直接資料庫對資料庫遷移,這裡我統一回答一下,我不會,我也不需要。)

okay,首先我們來看看程式碼先吧:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. import java.io.File;  
  2. import java.sql.Connection;  
  3. import java.sql.DriverManager;  
  4. import java.sql.PreparedStatement;  
  5. import java.sql.ResultSet;  
  6. import java.text.SimpleDateFormat;  
  7. import java.util.ArrayList;  
  8. import java.util.List;  
  9. import org.apache.hadoop.conf.Configuration;  
  10. import org.apache.hadoop.hbase.HBaseConfiguration;  
  11. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  12. import org.apache.hadoop.hbase.client.HTable;  
  13. import org.apache.hadoop.hbase.client.Put;  
  14. import org.apache.hadoop.hbase.util.Bytes;  
  15. import org.apache.log4j.BasicConfigurator;  
  16. import org.apache.log4j.Level;  
  17. import org.apache.log4j.Logger;  
  18. publicclass GpowerTransfer{  
  19.     privatestaticfinal String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//這裡是你HBase的分散式叢集結點,用逗號分開。
  20.     privatestaticfinal String CLIENT_PORT = "2181";//埠
  21.     privatestatic Logger log = Logger.getLogger(GpowerTransfer.class);  
  22.     /** 
  23.      * @param args 
  24.      */
  25.     publicstaticvoid main(String[] args) {  
  26.         BasicConfigurator.configure();  
  27.         log.setLevel(Level.DEBUG);  
  28.         String tableName = "kpi";//HBase表名稱
  29.         Configuration conf = HBaseConfiguration.create();  
  30.         conf.set("hbase.zookeeper.quorum", QUOREM);     
  31.         conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);  
  32.         try { File workaround = new File(".");  
  33.             System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());  
  34.             new File("./bin").mkdirs();  
  35.             new File("./bin/winutils.exe").createNewFile();//這幾段奇怪的程式碼在windows跑的時候不加有時候分報錯,在web專案中可以不要,但單獨的java程式還是加上去吧,知道什麼原因的小夥伴可以告訴我一下,不勝感激。
  36.             HBaseAdmin admin = new HBaseAdmin(conf);  
  37.             if(admin.tableExists(tableName)){  
  38.                 Class.forName("com.mysql.jdbc.Driver");//首先將mysql中的資料讀取出來,然後再插入到HBase中
  39.                 String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";  
  40.                 String username = "********";  
  41.                 String password = "********";  
  42.                 Connection con = DriverManager.getConnection(url, username, password);  
  43.                 PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");  
  44.                 ResultSet rs = pstmt.executeQuery();  
  45.                 HTable table = new HTable(conf, tableName);  
  46.                 log.debug(tableName + ":start copying data to hbase...");  
  47.                 List<Put> list = new ArrayList<Put>();  
  48.                 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");  
  49.                 String base = "base";//family名稱
  50.                 String gpower = "gpower";//family名稱
  51.                 String[] qbase = {"fid","tid","date"};//qualifier名稱
  52.                 String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名稱
  53.                 while(rs.next()){  
  54.                     String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
  55.                     Put put = new Put(Bytes.toBytes(rowKey));//新建一條記錄,然後下面對相應的列進行賦值
  56.                     put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
  57.                     put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
  58.                     put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
  59.                     put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
  60.                     put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
  61.                     put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
  62.                     put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
  63.                     put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
  64.                     list.add(put);  
  65.                 }  
  66.                 table.put(list);//這裡真正對錶進行插入操作
  67.                 log.debug(tableName + ":completed data copy!");  
  68.                 table.close();//這裡要非常注意一點,如果你頻繁地對錶進行開啟跟關閉,效能將會直線下降,可能跟叢集有關係。
  69.             }else{  
  70.                 admin.close();  
  71.                 log.error("table '" + tableName + "' not exisit!");  
  72.                 thrownew IllegalArgumentException("table '" + tableName + "' not exisit!");  
  73.             }  
  74.             admin.close();  
  75.         } catch (Exception e) {  
  76.             e.printStackTrace();  
  77.         }   
  78.     }  
  79. }  
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class GpowerTransfer{
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//這裡是你HBase的分散式叢集結點,用逗號分開。
	private static final String CLIENT_PORT = "2181";//埠
	private static Logger log = Logger.getLogger(GpowerTransfer.class);
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		BasicConfigurator.configure();
		log.setLevel(Level.DEBUG);
		String tableName = "kpi";//HBase表名稱
		Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", QUOREM);   
        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
        try { File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();//這幾段奇怪的程式碼在windows跑的時候不加有時候分報錯,在web專案中可以不要,但單獨的java程式還是加上去吧,知道什麼原因的小夥伴可以告訴我一下,不勝感激。
			HBaseAdmin admin = new HBaseAdmin(conf);
			if(admin.tableExists(tableName)){
				Class.forName("com.mysql.jdbc.Driver");//首先將mysql中的資料讀取出來,然後再插入到HBase中
				String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";
				String username = "********";
				String password = "********";
				Connection con = DriverManager.getConnection(url, username, password);
				PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");
				ResultSet rs = pstmt.executeQuery();
				HTable table = new HTable(conf, tableName);
				log.debug(tableName + ":start copying data to hbase...");
				List<Put> list = new ArrayList<Put>();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
				String base = "base";//family名稱
				String gpower = "gpower";//family名稱
				String[] qbase = {"fid","tid","date"};//qualifier名稱
				String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名稱
				while(rs.next()){
				    String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
				    Put put = new Put(Bytes.toBytes(rowKey));//新建一條記錄,然後下面對相應的列進行賦值
				    put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
				    put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
				    put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
				    put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
				    put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
				    put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
				    put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
				    put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
				    list.add(put);
				}
				table.put(list);//這裡真正對錶進行插入操作
				log.debug(tableName + ":completed data copy!");
				table.close();//這裡要非常注意一點,如果你頻繁地對錶進行開啟跟關閉,效能將會直線下降,可能跟叢集有關係。
			}else{
				admin.close();
				log.error("table '" + tableName + "' not exisit!");
				throw new IllegalArgumentException("table '" + tableName + "' not exisit!");
			}
			admin.close();
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
}

在put語句進行add的時候要特別注意:對於int、float、Date等等非String型別的資料,要記得將其轉換成String型別,這裡我直接用+""解決了,否則在你讀取資料的時候就會遇到麻煩了。

步驟3:Java Web專案讀取HBase裡面的資料

ok,我們成功地把資料遷移到HBase,我們剩下的任務就是在Web應用中讀取資料了。

首先我們要確保Web專案中已經把必要的Jar包新增到ClassPath了,下面我對一些HBase的連線做了小封裝:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. import org.apache.hadoop.conf.Configuration;  
  2. import org.apache.hadoop.hbase.HBaseConfiguration;  
  3. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  4. /** 
  5.  * @author a01513 
  6.  * 
  7.  */
  8. publicclass HBaseConnector {  
  9.     privatestaticfinal String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";  
  10.     privatestaticfinal String CLIENT_PORT = "2181";  
  11.     private HBaseAdmin admin;  
  12.     private Configuration conf;  
  13.     public HBaseAdmin getHBaseAdmin(){  
  14.         getConfiguration();  
  15.         try {  
  16.             admin = new HBaseAdmin(conf);  
  17.         } catch (Exception e) {  
  18.             e.printStackTrace();  
  19.         }  
  20.         return admin;   
  21.     }  
  22.     public Configuration getConfiguration(){  
  23.         if(conf == null){  
  24.             conf = HBaseConfiguration.create();  
  25.             conf.set("hbase.zookeeper.quorum", QUOREM);     
  26.             conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);     
  27.         }  
  28.         return conf;  
  29.     }  
  30. }  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;

/**
 * @author a01513
 *
 */
public class HBaseConnector {
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";
	private static final String CLIENT_PORT = "2181";
	private HBaseAdmin admin;
	private Configuration conf;
	
	
	public HBaseAdmin getHBaseAdmin(){
		getConfiguration();
        try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return admin; 
	}
	
	public Configuration getConfiguration(){
		if(conf == null){
			conf = HBaseConfiguration.create();
	        conf.set("hbase.zookeeper.quorum", QUOREM);   
	        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);   
		}
		return conf;
	}
}

這裡的程式碼基本上跟遷移的那部分程式碼一樣,由於我在其他地方都要重用這些程式碼,就裝在一個地方免得重複寫了。

我在Service層做了一下測試,下面看看具體的讀取過程:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. privatefinal String tableName = "kpi";  
  2. @Override
  3.     public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {  
  4.         List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();  
  5.         HBaseConnector hbaseConn = new HBaseConnector();  
  6.         HBaseAdmin admin = hbaseConn.getHBaseAdmin();  
  7.         try {  
  8.             if(admin.tableExists(tableName)){  
  9.                 HTable table = new HTable(hbaseConn.getConfiguration(), tableName);  
  10.                 Scan scan = new Scan();  
  11.                 scan.addFamily(Bytes.toBytes("base"));  
  12.                 scan.addFamily(Bytes.toBytes("gpower"));  
  13.                 scan.addFamily(Bytes.toBytes("userate"));  
  14.                 String startRowKey = new String();  
  15.                 String stopRowKey = new String();  
  16.                 if("".equals(start) && !"".equals(end)){  
  17.                     stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;  
  18.                     scan.setStopRow(Bytes.toBytes(stopRowKey));  
  19.                 }elseif(!"".equals(start) && "".equals(end)){  
  20.                     startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;  
  21.                     scan.setStartRow(Bytes.toBytes(startRowKey));  
  22.                 }elseif(!"".equals(start) && !"".equals(end)){  
  23.                     startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;  
  24.                     stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;  
  25.                     scan.setStartRow(Bytes.toBytes(startRowKey));  
  26.                     scan.setStopRow(Bytes.toBytes(stopRowKey));  
  27.                 }else{  
  28.                     table.close();  
  29.                     admin.close();  
  30.                     returnnull;  
  31.                 }  
  32.                 ResultScanner rsc = table.getScanner(scan);  
  33.                 Iterator<Result> it = rsc.iterator();  
  34.                 List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();  
  35.                 List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();  
  36.                 String tempRowKey = "";//這個臨時rowkey是用來判斷一行資料是否已經讀取完了的。
  37.                 GenPowerEntity gpower = new GenPowerEntity();  
  38.                 UseRateEntity userate = new UseRateEntity();  
  39.                 while(it.hasNext()){  
  40.                     for(Cell cell: it.next().rawCells()){  
  41.                         String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");  
  42.                         String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");  
  43.                         String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");  
  44.                         String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如我們當時插入HBase的時候沒有把int、float等型別的資料轉換成String,這裡就會亂碼了,並且用Bytes.toInt()這個方法還原也沒有用,哈哈
  45.                         System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);  
  46.                         if("".equals(tempRowKey))  
  47.                             tempRowKey = rowKey;  
  48.                         if(!rowKey.equals(tempRowKey)){  
  49.                             slist.add(gpower);  
  50.                             ulist.add(userate);  
  51.                             gpower = null;  
  52.                             userate = null;  
  53.                             gpower = new GenPowerEntity();  
  54.                             userate = new UseRateEntity();  
  55.                             tempRowKey = rowKey;  
  56.                         }  
  57.                         switch(family){  
  58.                         case"base":  
  59.                             switch(qualifier){  
  60.                             case"fid":  
  61.                                 gpower.setFarmid(value);  
  62.                                 userate.setFarmid(value);  
  63.                                 break;  
  64.                             case"tid":  
  65.                                 gpower.setTurbineid(Integer.parseInt(value));  
  66.                                 userate.setTurbineid(Integer.parseInt(value));  
  67.                                 break;  
  68.                             case"date":  
  69.                                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");  
  70.                                 Date date = null;  
  71.                                 try {  
  72.                                     date = sdf.parse(value);  
  73.                                 } catch (ParseException e) {  
  74.                                     e.printStackTrace();  
  75.                                 }  
  76.                                 gpower.setVtime(date);  
  77.                                 userate.setVtime(date);  
  78.                                 break;  
  79.                             }  
  80.                             break;  
  81.                         case"gpower":  
  82.                             switch(qualifier){  
  83.                             case"power":  
  84.                                 gpower.setValue(Float.parseFloat(value));  
  85.                                 break;  
  86.                             case"windspeed":  
  87.                                 gpower.setWindspeed(Float.parseFloat(value));  
  88.                                 break;  
  89.                             case"unpower":  
  90.                                 gpower.setUnvalue(Float.parseFloat(value));  
  91.                                 break;  
  92.                             case"theory":  
  93.                                 gpower.setTvalue(Float.parseFloat(value));  
  94.                                 break;  
  95.                             case"coup":  
  96.                                 gpower.setCoup(Float.parseFloat(value));  
  97.                                 break;  
  98.                             }  
  99.                             break;  
  100.                         case"userate":  
  101.                             switch(qualifier){  
  102.                             case"num":  
  103.                                 userate.setFnum(Integer.parseInt(value));  
  104.                                 break;  
  105.                             case"power":  
  106.                                 userate.setFpower(Float.parseFloat(value));  
  107.                                 break;  
  108.                             case"time":  
  109.                                 userate.setFvalue(Float.parseFloat(value));  
  110.                                 break;  
  111.                             }  
  112.                             break;  
  113.                         }  
  114.                     }  
  115.                 }  
  116.                 rsc.close();  
  117.                 table.close();  
  118.                 admin.close();  
  119.                 ......  
  120.             }  
  121.         } catch (IOException e) {  
  122.             e.printStackTrace();  
  123.         }  
  124.         return list;  
  125.     }  
private final String tableName = "kpi";
@Override
	public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {
		List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();
		HBaseConnector hbaseConn = new HBaseConnector();
		HBaseAdmin admin = hbaseConn.getHBaseAdmin();
		try {
			if(admin.tableExists(tableName)){
				HTable table = new HTable(hbaseConn.getConfiguration(), tableName);
				Scan scan = new Scan();
				scan.addFamily(Bytes.toBytes("base"));
				scan.addFamily(Bytes.toBytes("gpower"));
				scan.addFamily(Bytes.toBytes("userate"));
				String startRowKey = new String();
				String stopRowKey = new String();
				if("".equals(start) && !"".equals(end)){
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else if(!"".equals(start) && "".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					scan.setStartRow(Bytes.toBytes(startRowKey));
				}else if(!"".equals(start) && !"".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStartRow(Bytes.toBytes(startRowKey));
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else{
					table.close();
					admin.close();
					return null;
				}
				ResultScanner rsc = table.getScanner(scan);
				Iterator<Result> it = rsc.iterator();
				List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();
				List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();
				String tempRowKey = "";//這個臨時rowkey是用來判斷一行資料是否已經讀取完了的。
				GenPowerEntity gpower = new GenPowerEntity();
				UseRateEntity userate = new UseRateEntity();
				while(it.hasNext()){
					for(Cell cell: it.next().rawCells()){
						String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");
						String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");
						String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");
						String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如我們當時插入HBase的時候沒有把int、float等型別的資料轉換成String,這裡就會亂碼了,並且用Bytes.toInt()這個方法還原也沒有用,哈哈
						System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);
						if("".equals(tempRowKey))
							tempRowKey = rowKey;
						if(!rowKey.equals(tempRowKey)){
							slist.add(gpower);
							ulist.add(userate);
							gpower = null;
							userate = null;
							gpower = new GenPowerEntity();
							userate = new UseRateEntity();
							tempRowKey = rowKey;
						}
						switch(family){
						case "base":
							switch(qualifier){
							case "fid":
								gpower.setFarmid(value);
								userate.setFarmid(value);
								break;
							case "tid":
								gpower.setTurbineid(Integer.parseInt(value));
								userate.setTurbineid(Integer.parseInt(value));
								break;
							case "date":
								SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
								Date date = null;
								try {
									date = sdf.parse(value);
								} catch (ParseException e) {
									e.printStackTrace();
								}
								gpower.setVtime(date);
								userate.setVtime(date);
								break;
							}
							break;
						case "gpower":
							switch(qualifier){
							case "power":
								gpower.setValue(Float.parseFloat(value));
								break;
							case "windspeed":
								gpower.setWindspeed(Float.parseFloat(value));
								break;
							case "unpower":
								gpower.setUnvalue(Float.parseFloat(value));
								break;
							case "theory":
								gpower.setTvalue(Float.parseFloat(value));
								break;
							case "coup":
								gpower.setCoup(Float.parseFloat(value));
								break;
							}
							break;
						case "userate":
							switch(qualifier){
							case "num":
								userate.setFnum(Integer.parseInt(value));
								break;
							case "power":
								userate.setFpower(Float.parseFloat(value));
								break;
							case "time":
								userate.setFvalue(Float.parseFloat(value));
								break;
							}
							break;
						}
						
					}
				}
				rsc.close();
				table.close();
				admin.close();
				......
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return list;
	}

這是我在Service層中用作測試的一個方法,業務邏輯程式碼可以直接無視(已經用.....代替了,哈哈),至此我們的所有工作完成,對於更深入的應用,還要靠自己去認真挖掘學習了。