大資料之hbase(四) --- rowkey設計原則模擬通話日誌,BloomFilter,phonix環境部署,hive-hbase整合
阿新 • • 發佈:2018-12-10
一、rowkey設計 -- 模擬通話日誌 -------------------------------------------------- 1.建表 $hbase> create 'ns1:calllogs' , 'f1' 2.編寫程式 a.編寫主叫日誌存放類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; /** * 測試通話日誌 */ public class TsCallLogs { public Connection conn; public Table tb; @Before public void getConn() throws Exception { //獲取配置檔案 Configuration conf = HBaseConfiguration.create(); //工廠類建立連線 conn = ConnectionFactory.createConnection(conf); //get table TableName tbName = TableName.valueOf("ns1:calllogs"); tb = conn.getTable(tbName); } /** * rowkey的設計:常用的主要指標,全部編寫進來,而且要保證定長 * 區域號[0-99] , 1_id[主號碼] , time , 標識[0/1 主叫/背叫] , 2_id[從屬號碼] , 時長 * 區域號[0-99] = (1_id + time[yyyyMM]).hash() % 100[區域數] * @throws Exception */ @Test public void tsPutLog() throws Exception { String callerId = "13777777777"; //1_id 主叫 String calledId = "13888888888"; //2_id 被叫 SimpleDateFormat sdf = new SimpleDateFormat(); sdf.applyPattern("yyyyMMDDHHmmss"); String calledTime = sdf.format(new Date()); //通話時間 int isCaller = 0; //主叫 int duration = 100; //通話時長 //為了保證定長duration需要被格式化 DecimalFormat df1 = new DecimalFormat(); df1.applyPattern("00000"); String durStr = df1.format(duration); //獲取區域號[0-99]:假設一共有100個區域伺服器[100臺主機],設計hash值,將號碼打散 int hash = (callerId + calledTime.substring(0,6)).hashCode(); hash = (hash & Integer.MAX_VALUE) % 100; //保證hash的非負 DecimalFormat df = new DecimalFormat(); df.applyPattern("00"); String hashStr = df.format(hash); //拼接rowkey ==> 區域號[0-99] , 1_id[主號碼] , time , 標識[0/1 主叫/背叫] , 2_id[從屬號碼] , 時長 String rowKey = hashStr + "," + callerId + "," +calledTime + "," + isCaller + "," + calledId + "," + durStr; //開始put資料 Put put = new Put(Bytes.toBytes(rowKey)); //add put column cile put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"), Bytes.toBytes("河北")); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calledPos"), Bytes.toBytes("河南")); tb.put(put); System.out.println("put over"); } }
b.編寫被叫日誌存放類[觸發器類] -- 當主叫被觸發,就往被叫裡面新增記錄
package ts.calllogs; import javafx.scene.control.Tab; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; /** * 被叫日誌處理類 * 當主叫被觸發,就往被叫裡面新增記錄 */ public class TsCalledLogsRegionObserver extends BaseRegionObserver { @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { super.postPut(e, put, edit, durability); TableName tName = TableName.valueOf("ns1:calllogs"); TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable(); if (tName.equals(tName1)) { String rowKey = Bytes.toString(put.getRow()); String [] strs = rowKey.split(","); if(strs[3].equals("1")) { return; } //99,13777777777,201809259220228,1,13888888888,00100 String newKey = Util.getHash(strs[4],strs[2]) + "," +strs[4] + "," + strs[2] + ",1," + "," +strs[1] + "," + strs[5]; //開始put資料 Put p = new Put(Bytes.toBytes(newKey)); p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("ccc"),Bytes.toBytes("nothing")); Table tb = e.getEnvironment().getTable(tName); tb.put(p); System.out.println("put over"); } } }
c.編寫列印日誌類 -- 查詢指定號碼指定日期的通話記錄
package ts.calllogs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.NavigableMap; import java.util.Set; /** * 列印通話記錄 */ public class PrintLogs { public Connection conn; public Table tb; @Before public void getConn() throws Exception { //獲取配置檔案 Configuration conf = HBaseConfiguration.create(); //工廠類建立連線 conn = ConnectionFactory.createConnection(conf); //get table TableName tbName = TableName.valueOf("ns1:calllogs"); tb = conn.getTable(tbName); } @Test public void printlogs() throws Exception { Scan scan = new Scan(); String callerId = "13888888888"; String calledTime = "201809"; //通話時間 String hash = Util.getHash(callerId, calledTime); String startKey = hash + "," + callerId + "," + calledTime; String endKey = hash + "," + callerId + "," + "201810"; scan.setStartRow(Bytes.toBytes(startKey)); scan.setStopRow(Bytes.toBytes(endKey)); ResultScanner scanner = tb.getScanner(scan); Result result = null; while((result = scanner.next()) != null) { System.out.println(Bytes.toString(result.getRow())); } } }
3.打包部署 a.註冊協處理器,並分發到所有hbase節點 [hbase-site.xml] <property> <name>hbase.coprocessor.region.classes</name> <value>ts.calllogs.TsCalledLogsRegionObserver</value> </property> b.將打好的jar包分發到所有節點的/hbase/lib目錄下 4.執行插入測試 二、BloomFilter 布隆過濾器 ----------------------------------------------------------------- 1.在建立表的時候可以指定布隆過濾器,共有三種模式:NONE[預設],ROW[rowkey],ROWCOL[row and column] 2.當用戶需要查詢特定的rowkey時,伺服器需要載入每一個塊來檢查是否包含要檢索的key,這就產生了極大的I/O資源的浪費 3.可以使用布隆過濾器來避免這種io的浪費。 4.原理就是:布隆過濾器可以快速準確的檢測出,一個storefile中包不包含指定的rowkey.布隆過濾器會返回兩種檢索結果 -- NO--不包含,明確指出,塊中沒有,準確率100%。 -- MayBe-- 包含。塊中可能有,準確度99% 5.API演示
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
/**
* 測試布隆過濾器
*/
public class TsBloomFilter {
public Table tb;
public Connection conn;
@Before
public void getConn() throws Exception {
//獲取配置檔案
Configuration conf = HBaseConfiguration.create();
//工廠類建立連線
conn = ConnectionFactory.createConnection(conf);
//get table
TableName tbName = TableName.valueOf("ns1:bloom");
tb = conn.getTable(tbName);
}
@Test
public void tsBloom() throws Exception {
Admin admin = conn.getAdmin();
TableName tableName = TableName.valueOf("ns1:bloom");
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hclo = new HColumnDescriptor("f1");
hclo.setBloomFilterType(BloomType.ROW);
desc.addFamily(hclo);
admin.createTable(desc);
System.out.println("over");
}
}
三、phonix環境部署 ------------------------------------------- 1.安裝phonix a.下載apache-phoenix-4.10.0-HBase-1.2-bin.tar.gz b.tar開 c.複製xxx-server.jar 到伺服器端 hbase/lib下的目錄,並且分發 d.重啟hbase e.使用phonix $> phonix/bin/.sqlline.py s100 //注意:連線的是zk伺服器 $phonix> !tables //顯示錶格 $phonix> !help //檢視幫助 $phonix> !sql create table test (id varchar(20) primary key , name varchar(20)) //建立表 $phonix> !describe test //查看錶結構 $phonix> !drop test //刪除表 $phonix> select * from test; //全表掃描 2.SQLClient安裝[介面操作sql] a.下載squirrel-sql-3.7.1-standard.jar,該檔案是安裝檔案,執行的安裝程式。 b.$>jar -jar squirrel-sql-3.7.1-standard.jar $>下一步... c.複製phoenix-4.10.0-HBase-1.2-client.jar到SQuerrel安裝目錄的lib下(c:\myprograms\squirrel)。 d.啟動SQuirrel(GUI),定位安裝目錄->執行squirrel-sql.bat f.開啟GUI介面 g.在左側的邊欄選中"Drivers"選項卡, 點選 "+" -> URL : jdbc:phoenix:192.168.43.131 Driverclass : org.apache.phoenix.jdbc.PhoenixDriver jdbc:phoenix: s100 h.在Aliases下建立使用者,指定好連線的資料庫 四、使用phonix和SQLClient ------------------------------------------------------- //建表 $jdbc:phoenix> create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER); //插入資料 $jdbc:phoenix> UPSERT INTO test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12); //刪除資料 $jdbc:phoenix> delete from test.person where idcardnum = 1 ; //更新資料 $jdbc:phoenix> upsert into test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12); 五、hive-hbase整合:將hbase的表影射到hive上,使用hive的查詢語句。 ----------------------------------------------------------------------- 1.在hive下建立hbase的表 $hive> CREATE TABLE t11(key string, name string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name") TBLPROPERTIES("hbase.table.name" = "ns1:t11"); 2.在hive下操作hbase的表 $hive> select count(*) from t11 ;