1. 程式人生 > >大資料之hbase(四) --- rowkey設計原則模擬通話日誌,BloomFilter,phonix環境部署,hive-hbase整合

大資料之hbase(四) --- rowkey設計原則模擬通話日誌,BloomFilter,phonix環境部署,hive-hbase整合

一、rowkey設計 -- 模擬通話日誌
--------------------------------------------------
    1.建表
        $hbase> create 'ns1:calllogs' , 'f1'

    2.編寫程式
        a.編寫主叫日誌存放類
           
import org.apache.hadoop.conf.Configuration;
           import org.apache.hadoop.hbase.HBaseConfiguration;
           import org.apache.hadoop.hbase.TableName;
           import org.apache.hadoop.hbase.client.Connection;
           import org.apache.hadoop.hbase.client.ConnectionFactory;
           import org.apache.hadoop.hbase.client.Put;
           import org.apache.hadoop.hbase.client.Table;
           import org.apache.hadoop.hbase.util.Bytes;
           import org.junit.Before;
           import org.junit.Test;

           import java.io.IOException;
           import java.text.DecimalFormat;
           import java.text.SimpleDateFormat;
           import java.util.Date;

           /**
            * 測試通話日誌
            */
           public class TsCallLogs {

               public Connection conn;
               public Table tb;

               @Before
               public void getConn() throws Exception {
                   //獲取配置檔案
                   Configuration conf = HBaseConfiguration.create();
                   //工廠類建立連線
                   conn = ConnectionFactory.createConnection(conf);
                   //get table
                   TableName tbName = TableName.valueOf("ns1:calllogs");
                   tb = conn.getTable(tbName);
               }


               /**
                * rowkey的設計:常用的主要指標,全部編寫進來,而且要保證定長
                * 區域號[0-99] , 1_id[主號碼] , time , 標識[0/1  主叫/背叫] , 2_id[從屬號碼] , 時長
                * 區域號[0-99] = (1_id + time[yyyyMM]).hash()  %   100[區域數]
                * @throws Exception
                */
               @Test
               public void tsPutLog() throws Exception {

                   String callerId = "13777777777";            //1_id 主叫
                   String calledId = "13888888888";            //2_id 被叫
                   SimpleDateFormat sdf = new SimpleDateFormat();
                   sdf.applyPattern("yyyyMMDDHHmmss");
                   String calledTime = sdf.format(new Date()); //通話時間
                   int isCaller = 0;                           //主叫
                   int duration = 100;                         //通話時長

                   //為了保證定長duration需要被格式化
                   DecimalFormat df1 = new DecimalFormat();
                   df1.applyPattern("00000");
                   String durStr = df1.format(duration);

                   //獲取區域號[0-99]:假設一共有100個區域伺服器[100臺主機],設計hash值,將號碼打散
                   int hash = (callerId + calledTime.substring(0,6)).hashCode();
                   hash =  (hash & Integer.MAX_VALUE) % 100;  //保證hash的非負

                   DecimalFormat df = new DecimalFormat();
                   df.applyPattern("00");
                   String hashStr = df.format(hash);

                   //拼接rowkey ==> 區域號[0-99] , 1_id[主號碼] , time , 標識[0/1  主叫/背叫] , 2_id[從屬號碼] , 時長
                   String rowKey = hashStr + "," + callerId + "," +calledTime + "," + isCaller + "," + calledId + "," + durStr;

                   //開始put資料
                   Put put = new Put(Bytes.toBytes(rowKey));
                   //add put column cile
                   put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callerPos"), Bytes.toBytes("河北"));
                   put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("calledPos"), Bytes.toBytes("河南"));
                   tb.put(put);
                   System.out.println("put over");
               }
           }

        b.編寫被叫日誌存放類[觸發器類] -- 當主叫被觸發,就往被叫裡面新增記錄
           
 package ts.calllogs;

            import javafx.scene.control.Tab;
            import org.apache.hadoop.hbase.TableName;
            import org.apache.hadoop.hbase.client.Durability;
            import org.apache.hadoop.hbase.client.Put;
            import org.apache.hadoop.hbase.client.Table;
            import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
            import org.apache.hadoop.hbase.coprocessor.ObserverContext;
            import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
            import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
            import org.apache.hadoop.hbase.util.Bytes;

            import java.io.IOException;

            /**
             * 被叫日誌處理類
             * 當主叫被觸發,就往被叫裡面新增記錄
             */
            public class TsCalledLogsRegionObserver extends BaseRegionObserver {

                @Override
                public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
                    super.postPut(e, put, edit, durability);

                    TableName tName = TableName.valueOf("ns1:calllogs");

                    TableName tName1 = e.getEnvironment().getRegion().getRegionInfo().getTable();

                    if (tName.equals(tName1)) {
                        String rowKey = Bytes.toString(put.getRow());
                        String [] strs = rowKey.split(",");
                        if(strs[3].equals("1"))
                        {
                            return;
                        }
                        //99,13777777777,201809259220228,1,13888888888,00100
                        String newKey = Util.getHash(strs[4],strs[2]) + "," +strs[4] + "," + strs[2] + ",1," + "," +strs[1] + "," + strs[5];
                        //開始put資料
                        Put p = new Put(Bytes.toBytes(newKey));
                        p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("ccc"),Bytes.toBytes("nothing"));
                        Table tb = e.getEnvironment().getTable(tName);
                        tb.put(p);
                        System.out.println("put over");
                    }
                }
            }

        c.編寫列印日誌類 -- 查詢指定號碼指定日期的通話記錄
           
 package ts.calllogs;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.hbase.HBaseConfiguration;
            import org.apache.hadoop.hbase.TableName;
            import org.apache.hadoop.hbase.client.*;
            import org.apache.hadoop.hbase.util.Bytes;
            import org.junit.Before;
            import org.junit.Test;

            import java.io.IOException;
            import java.text.SimpleDateFormat;
            import java.util.Date;
            import java.util.NavigableMap;
            import java.util.Set;

            /**
             * 列印通話記錄
             */
            public class PrintLogs {

                public Connection conn;
                public Table tb;

                @Before
                public void getConn() throws Exception {
                    //獲取配置檔案
                    Configuration conf = HBaseConfiguration.create();
                    //工廠類建立連線
                    conn = ConnectionFactory.createConnection(conf);
                    //get table
                    TableName tbName = TableName.valueOf("ns1:calllogs");
                    tb = conn.getTable(tbName);
                }

                @Test
                public void printlogs() throws Exception {
                    Scan scan = new Scan();
                    String callerId = "13888888888";
                    String calledTime = "201809";               //通話時間
                    String hash = Util.getHash(callerId, calledTime);
                    String startKey = hash + "," + callerId + "," + calledTime;
                    String endKey = hash + "," + callerId + "," + "201810";
                    scan.setStartRow(Bytes.toBytes(startKey));
                    scan.setStopRow(Bytes.toBytes(endKey));
                    ResultScanner scanner = tb.getScanner(scan);
                    Result result = null;
                    while((result =  scanner.next()) != null) {
                        System.out.println(Bytes.toString(result.getRow()));
                    }
                }
            }


    3.打包部署
        a.註冊協處理器,並分發到所有hbase節點
            [hbase-site.xml]
            <property>
                <name>hbase.coprocessor.region.classes</name>
                <value>ts.calllogs.TsCalledLogsRegionObserver</value>
            </property>

        b.將打好的jar包分發到所有節點的/hbase/lib目錄下

    4.執行插入測試


二、BloomFilter 布隆過濾器
-----------------------------------------------------------------
    1.在建立表的時候可以指定布隆過濾器,共有三種模式:NONE[預設],ROW[rowkey],ROWCOL[row and column]

    2.當用戶需要查詢特定的rowkey時,伺服器需要載入每一個塊來檢查是否包含要檢索的key,這就產生了極大的I/O資源的浪費

    3.可以使用布隆過濾器來避免這種io的浪費。

    4.原理就是:布隆過濾器可以快速準確的檢測出,一個storefile中包不包含指定的rowkey.布隆過濾器會返回兩種檢索結果
        -- NO--不包含,明確指出,塊中沒有,準確率100%。
        -- MayBe-- 包含。塊中可能有,準確度99%

    5.API演示
        
import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.hbase.HBaseConfiguration;
        import org.apache.hadoop.hbase.HColumnDescriptor;
        import org.apache.hadoop.hbase.HTableDescriptor;
        import org.apache.hadoop.hbase.TableName;
        import org.apache.hadoop.hbase.client.Admin;
        import org.apache.hadoop.hbase.client.Connection;
        import org.apache.hadoop.hbase.client.ConnectionFactory;
        import org.apache.hadoop.hbase.client.Table;
        import org.apache.hadoop.hbase.regionserver.BloomType;
        import org.junit.Before;
        import org.junit.Test;

        import java.io.IOException;

        /**
         * 測試布隆過濾器
         */
        public class TsBloomFilter {

            public Table tb;
            public Connection conn;

            @Before
            public void getConn() throws Exception {
                //獲取配置檔案
                Configuration conf = HBaseConfiguration.create();
                //工廠類建立連線
                conn = ConnectionFactory.createConnection(conf);
                //get table
                TableName tbName = TableName.valueOf("ns1:bloom");
                tb = conn.getTable(tbName);
            }


            @Test
            public void tsBloom() throws Exception {

                Admin admin = conn.getAdmin();
                TableName tableName = TableName.valueOf("ns1:bloom");
                HTableDescriptor desc = new HTableDescriptor(tableName);
                HColumnDescriptor hclo = new HColumnDescriptor("f1");
                hclo.setBloomFilterType(BloomType.ROW);
                desc.addFamily(hclo);
                admin.createTable(desc);
                System.out.println("over");
            }

        }


三、phonix環境部署
-------------------------------------------
    1.安裝phonix
        a.下載apache-phoenix-4.10.0-HBase-1.2-bin.tar.gz
        b.tar開
        c.複製xxx-server.jar 到伺服器端 hbase/lib下的目錄,並且分發
        d.重啟hbase
        e.使用phonix
            $> phonix/bin/.sqlline.py s100 //注意:連線的是zk伺服器
            $phonix> !tables                 //顯示錶格
            $phonix> !help             //檢視幫助
            $phonix> !sql create table test (id varchar(20) primary key , name varchar(20))     //建立表
            $phonix> !describe  test        //查看錶結構
            $phonix> !drop  test            //刪除表
            $phonix> select * from test;    //全表掃描


    2.SQLClient安裝[介面操作sql]
        a.下載squirrel-sql-3.7.1-standard.jar,該檔案是安裝檔案,執行的安裝程式。
        b.$>jar -jar squirrel-sql-3.7.1-standard.jar
          $>下一步...
        c.複製phoenix-4.10.0-HBase-1.2-client.jar到SQuerrel安裝目錄的lib下(c:\myprograms\squirrel)。
        d.啟動SQuirrel(GUI),定位安裝目錄->執行squirrel-sql.bat
        f.開啟GUI介面
        g.在左側的邊欄選中"Drivers"選項卡,
            點選 "+" ->
            URL             : jdbc:phoenix:192.168.43.131
            Driverclass       : org.apache.phoenix.jdbc.PhoenixDriver
            jdbc:phoenix: s100
        h.在Aliases下建立使用者,指定好連線的資料庫


四、使用phonix和SQLClient
-------------------------------------------------------
    //建表
    $jdbc:phoenix> create table IF NOT EXISTS test.Person (IDCardNum INTEGER not null primary key, Name varchar(20),Age INTEGER);

    //插入資料
    $jdbc:phoenix> UPSERT INTO test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12);

    //刪除資料
    $jdbc:phoenix> delete from test.person where idcardnum = 1 ;

    //更新資料
    $jdbc:phoenix> upsert into test.PERSON(IDCardNum , Name,Age) VALUES (1,'tom',12);


五、hive-hbase整合:將hbase的表影射到hive上,使用hive的查詢語句。
-----------------------------------------------------------------------
    1.在hive下建立hbase的表
        $hive> CREATE TABLE t11(key string, name string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:name")
        TBLPROPERTIES("hbase.table.name" = "ns1:t11");

    2.在hive下操作hbase的表
        $hive> select count(*) from t11 ;