1. 程式人生 > >利用phoenix進行Hbase資料訪問

利用phoenix進行Hbase資料訪問

Hadoop HBase Hlive 

一、背景

近期一個使用者畫像的專案,資料量龐大,用MySQL進行存取不太現實,所以採用Hbase叢集的方案來實施。由於業務層使用的是PHP,所以研發同學首先想到的是PHP-Thrift來訪問Hbase,編碼實驗了幾天,效果不是太理想,尤其是編碼成本較大,各種scan、filter之類的語法,不利於團隊進行快速開發;當然,最崩潰的還是想利用count進行資料總量計算,是Thrift裡,這個太難搞。

所以再換一個phoenix的方案,模擬SQL的形式進行Hbase資料訪問;不過這東西沒有PHP版本的,只有Hbasejar包支援,還有一個python版本的command line console

,開發過程中用來做資料檢視還是比較方便的。

二、環境部署

1、phoenix下載

2、部署jar包到Hbase叢集

# 下載phoenix
wget http://apache.fayea.com/phoenix/phoenix-4.7.0-HBase-1.1/bin/phoenix-4.7.0-HBase-1.1-bin.tar.gz
# 解壓
tar zxfv phoenix-4.7.0-HBase-1.1-bin.tar.gz > /dev/null
# 部署jar包到hbase
cp -r phoenix-4.7.0-HBase-1.1/*.jar /home/hbase/hbase-1.1.5/lib/
# 重啟Hbase
/home/hbase/hbase-1.1.5/bin/stop-hbase.sh /home/hbase/hbase-1.1.5/bin/start-hbase.sh

3、驗證phoenix安裝情況

cd /home/hbase/phoenix-4.7.0-HBase-1.1/bin
./sqlline.py localhost:2181

出現下圖所示的樣子,就算是安裝成功了:

1.png

敲擊 !help 命令,檢視內建命令:

0: jdbc:phoenix:localhost:2181> !help
!all                Execute the specified SQL against all the current connections
!autocommit         Set autocommit mode on or off
!batch              Start or execute a batch of statements
!brief              Set verbose mode off
!call               Execute a callable statement
!close              Close the current connection to the database
!closeall           Close all current open connections
!columns            List all the columns for
the specified table !commit Commit the current transaction (if autocommit is off) !connect Open a new connection to the database. !dbinfo Give metadata information about the database !describe Describe a table !dropall Drop all tables in the current database !exportedkeys List all the exported keys for the specified table !go Select the current connection !help Print a summary of command usage !history Display the command history !importedkeys List all the imported keys for the specified table !indexes List all the indexes for the specified table !isolation Set the transaction isolation for this connection !list List the current connections !manual Display the SQLLine manual !metadata Obtain metadata information !nativesql Show the native SQL for the specified statement !outputformat Set the output format for displaying results (table,vertical,csv,tsv,xmlattrs,xmlelements) !primarykeys List all the primary keys for the specified table !procedures List all the procedures !properties Connect to the database specified in the properties file(s) !quit Exits the program !reconnect Reconnect to the database !record Record all output to the specified file !rehash Fetch table and column names for command completion !rollback Roll back the current transaction (if autocommit is off) !run Run a script from the specified file !save Save the current variabes and aliases !scan Scan for installed JDBC drivers !script Start saving a script to a file !set Set a sqlline variable ......

4、檢視DB中已經存在的表

0: jdbc:phoenix:localhost:2181> !table

2.png

5、查看錶結構(隱藏列族名)

0: jdbc:phoenix:localhost:2181> !describe "xxx"

3.png

注意:phoenix/hbase對錶名、欄位名都是大小寫敏感,如果直接寫小寫字母,不加雙引號,則預設會被轉換成大寫字母。

6、查看錶內容

0: jdbc:phoenix:localhost:2181> select * from "xxx" ;

4.png

PhoenixSQL的語法跟MySQL語法沒多大區別,入門成本較低。注意,如果Hbase的表已經有了,則需要手動再在Phoenix中建立同名(注意雙引號括起來的大小寫)的Table。

三、開發

Phoenix提供的是Hbase的jar包支援,所以肯定是建立一個Java Web Project來提供API服務。

1、設計原則

  • 模擬Python版本Command line Console的操作,直接接受原生Phoenix-SQL作為引數進行處理
  • Phoenix DB不支援直接設定連線超時, 所以這裡使用執行緒池的方式來控制資料庫連線超時
  • SQL處理後的結果存放在一個PhoenixResultSet中,SQL本身不固定,所以結果欄位也不固定;所以這裡使用PhoenixResultSet.getMetaData()來獲取返回的欄位名
  • 上層應用一般不要求資料返回的型別,所以全部採用PhoenixResultSet.getString(index)的形式獲取字串型別欄位值
  • 最終資料編譯成JSON格式進行返回,藉助org.json.jar包來處理

2、編碼實現

1)、PhoenixClient.java

package com.qudian.bi;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

/**
 * 利用Phoenix訪問Hbase
 *
 * @author zhaoxianlie
 */
public class PhoenixClient {

           /**
            * 利用靜態塊的方式初始化Driver,防止Tomcat載入不到(有時候比較詭異)
            */
           static {
               try {
                   Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
               } catch (ClassNotFoundException e) {
                   e.printStackTrace();
               }
           }

           /**
            * 獲取一個Hbase-Phoenix的連線
            *
            * @param host
            *            zookeeper的master-host
            * @param port
            *            zookeeper的master-port
            * @return
            */
           private static Connection getConnection(String host, String port) {
               Connection cc = null;
               final String url = "jdbc:phoenix:" + host + ":" + port;

               if (cc == null) {
                   try {
                       // Phoenix DB不支援直接設定連線超時
                       // 所以這裡使用執行緒池的方式來控制資料庫連線超時
                       final ExecutorService exec = Executors.newFixedThreadPool(1);
                       Callable<Connection> call = new Callable<Connection>() {
                           public Connection call() throws Exception {
                               return DriverManager.getConnection(url);
                           }
                       };
                       Future<Connection> future = exec.submit(call);
                       // 如果在5s鍾之內,還沒得到 Connection 物件,則認為連線超時,不繼續阻塞,防止服務夯死
                       cc = future.get(1000 * 5, TimeUnit.MILLISECONDS);
                       exec.shutdownNow();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   } catch (ExecutionException e) {
                       e.printStackTrace();
                   } catch (TimeoutException e) {
                       e.printStackTrace();
                   }
               }
               return cc;
           }

           /**
            * 根據host、port,以及sql查詢hbase中的內容;根據phoenix支援的SQL格式,查詢Hbase的資料,並返回json格式的資料
            *
            * @param host
            *            zookeeper的master-host
            * @param port
            *            zookeeper的master-port
            * @param phoenixSQL
            *            sql語句
            * @return json-string
            * @return
            */
           public static String execSql(String host, String port, String phoenixSQL) {
               if (host == null || port == null || host.trim() == ""
                       || port.trim() == "") {
                   return "必須指定hbase master的IP和埠";
               } else if (phoenixSQL == null || phoenixSQL.trim() == "") {
                   return "請指定合法的Phoenix SQL!";
               }

               String result = "";
               try {
                   // 耗時監控:記錄一個開始時間
                   long startTime = System.currentTimeMillis();

                   // 獲取一個Phoenix DB連線
                   Connection conn = PhoenixClient.getConnection(host, port);
                   if (conn == null) {
                       return "Phoenix DB連線超時!";
                   }

                   // 準備查詢
                   Statement stmt = conn.createStatement();
                   PhoenixResultSet set = (PhoenixResultSet) stmt
                           .executeQuery(phoenixSQL);

                   // 查詢出來的列是不固定的,所以這裡通過遍歷的方式獲取列名
                   ResultSetMetaData meta = set.getMetaData();
                   ArrayList<String> cols = new ArrayList<String>();

                   // 把最終資料都轉成JSON返回
                   JSONArray jsonArr = new JSONArray();
                   while (set.next()) {
                       if (cols.size() == 0) {
                           for (int i = 1, count = meta.getColumnCount(); i <= count; i++) {
                               cols.add(meta.getColumnName(i));
                           }
                       }

                       JSONObject json = new JSONObject();
                       for (int i = 0, len = cols.size(); i < len; i++) {
                           json.put(cols.get(i), set.getString(cols.get(i)));
                       }
                       jsonArr.put(json);
                   }
                   // 耗時監控:記錄一個結束時間
                   long endTime = System.currentTimeMillis();

                   // 結果封裝
                   JSONObject data = new JSONObject();
                   data.put("data", jsonArr);
                   data.put("cost", (endTime - startTime) + " ms");
                   result = data.toString();
               } catch (SQLException e) {
                   e.printStackTrace();
                   return "SQL執行出錯:" + e.getMessage();
               } catch (JSONException e) {
                   e.printStackTrace();
                   return "JSON轉換出錯:" + e.getMessage();
               }
               return result;
           }

           /**
            * Just for phoenix test!
            * @param args
            */
           public static void main(String[] args) {
               String pheonixSQL = "select count(1) from \"t\"";
               String host = "localhost";
               if(args.length >= 1) {
                   host = args[0];
               }
               String result = PhoenixClient.execSql(host, "2181", pheonixSQL);
               System.out.println(result);
           }
}

2)、Servlet

public void doGet(HttpServletRequest request, HttpServletResponse response)
               throws ServletException, IOException {

           response.setContentType("application/json;charset=utf-8");
           PrintWriter out = response.getWriter();
           String host = request.getParameter("host");
           String port = request.getParameter("port");

           if (host == null || port == null || host.trim() == ""
                   || port.trim() == "") {
               ServletContext context = getServletContext();
               host = context.getInitParameter("hbase-master-ip");
               port = context.getInitParameter("hbase-master-port");
           }

           String phoenixSQL = request.getParameter("sql");
           String json = PhoenixClient.execSql(host, port, phoenixSQL);
           out.println(json);
           out.flush();
           out.close();
}

四、使用

所有SQL都需要進行urlencode / encodeURIComponent處理

1、查詢xxx表的記錄條數

# phoenix sql、做 url encode 處理
$sql = 'select count(1) from "xxx"';
$sql = urlencode($sql);

# 訪問下面介面獲取資料
$url = 'http://localhost:8080?host=localhost&port=2181&sql=' . $sql ;

返回的資料格式:

{
    "data": [
        {
            "COUNT(1)": "4"
        }
    ],
    "cost": "199 ms"
}

COUNT(1)作為欄位名感覺很奇怪,對應的SQL也可以改一下,加個別名,如:

$sql = 'select count(1) as "count" from "xxx"';

得到的結果為:

{
    "data": [
        {
            "count": "4"
        }
    ],
    "cost": "93 ms"
}

2、查詢表裡的所有資料(結果集太大就別這麼玩兒了)

$sql = 'select * from "xxx"';

得到的結果為:

{
    "data": [
        {
            "val3": "ehhhh",
            "ROW": "key1",
            "val1": "ehhhh",
            "val2": "ehhhh"
        },
        {
            "ROW": "key2",
            "val1": "hhhhh"
        },
        {
            "ROW": "key3",
            "val1": "hhhhh3"
        },
        {
            "ROW": "key4",
            "val1": "hhhhh4"
        }
    ],
    "cost": "19 ms"
}

3、只獲取某個欄位,且進行條件過濾

$sql = 'select ROW,"val1" from "xxx" where "val1"=\'hhhhh4\'';

得到結果集:

{
    "data": [
        {
            "ROW": "key3",
            "val1": "hhhhh3"
        }
    ],
    "cost": "24 ms"
}

其他的情況,就不舉例了。

五、總結

就完全可以把Phoenix當成MySQL來用,要想速度快,還是建立好索引再使用;在資料量龐大的情況下,有索引和沒索引,查詢速度是天壤之別的。

如果你也正好在玩兒這個東西,希望對你有幫助。

相關推薦

利用phoenix進行Hbase資料訪問

Hadoop HBase Hlive  一、背景 近期一個使用者畫像的專案,資料量龐大,用MySQL進行存取不太現實,所以採用Hbase叢集的方案來實施。由於業務層使用的是PHP,所以研發同學首先想到的是PHP-Thrift來訪問Hbase,編碼實驗了幾天,效果不是太

請教利用fegin進行遠程訪問設置Hystrix熔斷器不生效

size 控制 fin png schema work www sha fall 本人的環境:1.基於spring boot 2.0.4的 spring cloud(Finchley.SR1)2.分為eureka,merber,order。order通過Fegin的方式調用

Android利用Fiddler進行網路資料抓包

主要介紹Android及IPhone手機上如何利用Fiddler進行網路資料抓包,比如我們想抓某個應用(微博、微信、墨跡天氣)的網路通訊請求就可以利用這個方法。 Mac 下請使用 Charles 代替 Fiddler,Charles 免費啟用碼為Registered

iOS利用AFNetworking進行JSON資料解析

//初始化資料 AFHTTPRequestOperationManager *manager = [AFHTTPRequestOperationManagermanager];    NSDictionary *parameter = @{@"status": @"ok",@"userData":myA

利用Java進行網路資料包捕捉

http://jnetpcap.com/examples 在上一期的欄目中我們介紹了通過Fiddler嗅探Http協議網路資料包的方法,並且在文章最後通過開心農場的例子來展示網路嗅探的基本操作。但手工獲得資料畢竟耗時耗力,頗為麻煩,不妨將這個工作交給電腦,寫一個程式讓電腦

EF Core下利用Mysql進行數據存儲在並發訪問下的數據同步問題

sta 分享 AC point 解釋 evel post mysql數據庫 cor 小故事 在開始講這篇文章之前,我們來說一個小故事,純素虛構(真實的存錢邏輯並非如此) 小劉發工資後,趕忙拿著現金去銀行,準備把錢存起來,而與此同時,小劉的老婆劉嫂知道小劉的品性,知道他發

利用python進行資料分析(第二版) pdf下載

適讀人群 :適合剛學Python的資料分析師或剛學資料科學以及科學計算的Python程式設計者。 閱讀本書可以獲得一份關於在Python下操作、處理、清洗、規整資料集的完整說明。本書第二版針對Python 3.6進行了更新,並增加實際案例向你展示如何高效地解決一系列資料分析問題。你將在閱讀

利用Python進行資料分析》學習記錄

第8章249頁 原語句:party_counts = pd.crosstab(tips.day, tips.size) 現在的pandas似乎有個size屬性,就是計算資料的大小,而不會返回那一列具體的資料,比如這裡tips這個csv資料,其裡面包含一列size資料,現在來執行這句語句的話,

資料基礎---《利用Python進行資料分析·第2版》第12章 pandas高階應用

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 前面的章節關注於不同型別的資料規整流程和NumPy、pandas與其它庫的特點。隨著時間的發展,pandas發展出了更多適

資料基礎---《利用Python進行資料分析·第2版》第6章 資料載入、儲存與檔案格式

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 訪問資料是使用本書所介紹的這些工具的第一步。我會著重介紹pandas的資料輸入與輸出,雖然別的庫中也有不少以此為目的的工具

資料基礎---《利用Python進行資料分析·第2版》第4章 NumPy基礎:陣列和向量計算

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 NumPy(Numerical Python的簡稱)是Python數值計算最重要的基礎包。大多數提供科學計算的包都是用Nu

資料基礎---《利用Python進行資料分析·第2版》第11章 時間序列

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 時間序列(time series)資料是一種重要的結構化資料形式,應用於多個領域,包括金融學、經濟學、生態學、神經科學、物

資料基礎---《利用Python進行資料分析·第2版》第10章 資料聚合與分組運算

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 對資料集進行分組並對各組應用一個函式(無論是聚合還是轉換),通常是資料分析工作中的重要環節。在將資料集載入、融合、準備好之

資料基礎---《利用Python進行資料分析·第2版》第8章 資料規整:聚合、合併和重塑

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 在許多應用中,資料可能分散在許多檔案或資料庫中,儲存的形式也不利於分析。本章關注可以聚合、合併、重塑資料的方法。 首先

資料基礎---《利用Python進行資料分析·第2版》第7章 資料清洗和準備

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 在資料分析和建模的過程中,相當多的時間要用在資料準備上:載入、清理、轉換以及重塑。這些工作會佔到分析師時間的80%或更多。

資料基礎---《利用Python進行資料分析·第2版》第5章 pandas入門

之前自己對於numpy和pandas是要用的時候東學一點西一點,直到看到《利用Python進行資料分析·第2版》,覺得只看這一篇就夠了。非常感謝原博主的翻譯和分享。 pandas是本書後續內容的首選庫。它含有使資料清洗和分析工作變得更快更簡單的資料結構和操作工具。pandas經常和其它工

請教利用fegin進行遠端訪問設定Hystrix熔斷器不生效

本人的環境:1.基於spring boot 2.0.4的 spring cloud(Finchley.SR1)2.分為eureka,merber,order。order通過Fegin的方式呼叫merber的一個方法困惑:遠端呼叫 利用註解@HystrixCommand的方式熔斷器可以起作用, 但是利用這

分享 《利用Python進行資料分析(第二版)》高清中文版PDF+英文版PDF+原始碼

資料下載:https://pan.baidu.com/s/1K3DjJ9S1S3AxpacEElNF9Q 《利用Python進行資料分析(第二版)》【中文版和英文版】【高清完整版PDF】+【配套原始碼】 《利用Python進行資料分析(第二版)》中文和英文兩版對比學習, 高清完整版PDF,帶書籤,可複製貼

利用Python進行資料分析之第七章 記錄2 資料規整化:清理、轉換、合併、重塑

索引上的合併 DataFrame中傳入引數left_index=True或者right_index=True(或者兩個都傳入),表示DataFrame的index(索引)被用作兩個DataFrame連線的連線鍵,如下: dataframe1 = DataFrame({'key':

利用Python進行資料分析之第七章記錄 資料規整化:清理、轉換、合併、重塑

合併資料集: pandas物件中的資料可以通過一些內建的方式進行合併: pandas.merge可根據一個或多個鍵將不同DataFrame中的行連線起來。SQL或其它關係型資料庫的使用者對此應該會比較熟悉,因為它實現的就是資料庫的連線操作。 pandas.concat可以沿著一條軸將多個