1. 程式人生 > >Spark SQL 大資料處理

Spark SQL 大資料處理

InfoQ 上有學者對 Spark 的大資料處理,做了一些歸納演講
我嘗試著對這些演講做翻譯,加入了一些自己的理解和實驗
理解是我自己的,有可能是錯誤的,實驗是為了證明自己的理解是正確的

Big Data Processing with Apache Spark - Part 2 : Spark SQL

這是有關 Spark 大資料處理的第二講。
上一篇《為什麼要使用 Spark》是引言介紹 Spark, 這一篇講實際應用。

Spark 是一套計算框架

腳手架就那麼幾種,是想要摩天大樓,還是想要濠苑別墅,都基於 Spark 的基礎建設。
Spark SQL 就是一種對 Spark 基建的改造應用, Spark 的底子, SQL 的上層建築。
你照樣寫你的 SQL , Spark SQL 幫你翻譯成 Spark 底層應用的計算步驟,
完成分散式計算後,把結果彙總好了,給到你, Spark 是如何處理的,完全不用你操心

關於底層,唯一要注意的是,資料“底料”的格式,
蓋樓大概都是鋼筋水泥,或者木料
或者跟南京城牆一樣,用米糊壘磚,屹立幾百年不倒

如何體現出 Spark SQL 的優越性?

假設我們有 3 臺 Spark 計算機組成了 Spark SQL 叢集
還有一臺和 spark 節點一樣配置的 SQL Server/ Oracle
計算同樣的 3 億條資料的彙總,分別看看時間長短。

Select AVG(SalesAmount) From FctSales ;

假設 FctSales 有 30億條資料,每臺計算機的記憶體都是 1 G, CPU 1.3.
看看計算返回的時間,便可以知曉那種架構快了

資料底料,Spark SQL 連線的資料格式,除了 Hive, 還有 Json , Batch File等
可能最新的版本還會支援更多的格式,
比如 2018.02.28 發表的 Spark 2.3 就支援 Vectorized ORC Reader.

從 Spark 1.3 開始,之前的 SchemaRDD 等概念就有新的更新了:

*DataFrame: The new release provides a programming abstraction called DataFrames which can act as distributed SQL query engine.
Data Sources: With the addition of the data sources API, Spark SQL now makes it easier to compute over structured data stored in a wide variety of formats, including Parquet, JSON, and Apache Avro library.
JDBC Server: The built-in JDBC server makes it easy to connect to the structured data stored in relational database tables and perform big data analytics using the traditional BI tools.*

DataFrame 取代了 SchemaRDD, 成了新一代的分散式查詢引擎
但對於使用者來說這是透明的,我們還是用 SQL 來寫,DataFrame 自動幫我們完成解析 SQL 語句,網路通訊,抓取資料,彙總資料等操作

Data Source, 也就是資料底料,開始支援 Parquet, Avro , Json.
Json, 大家都很容易理解,平時用的也不少,就是JavaScript Object.

那麼 Parquet, Avro 是什麼,
為什麼要有這兩種格式的資料?
分別有什麼先進的地方,這些都留給我們去發現。

Apache Parquet , 是一種列式儲存格式的資料,以下是wikipedia 的說明:

Apache Parquet is a free and open-source column-oriented data store of the Apache Hadoop ecosystem. It is similar to the other columnar storage file formats available in Hadoop namely RCFile and Optimized RCFile.

所以 Apache Parquet 也是基於分散式的列式儲存

Apache Avro 就沒那麼簡單了:

*Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.
It is similar to Thrift and Protocol Buffers, but does not require running a code-generation program when a schema changes (unless desired for statically-typed languages).*

它可以將 Json 序列化成壓縮的二進位制格式,使得資料體積更小,更容易傳播和儲存。

最大化利用我們的 Spark SQL - Spark SQL JDBC Server

並不是每個使用者都會有 Spark SQL 的程式設計技巧
資料分析手段的使用,正在慢慢推進。
從一開始的 C/S 系統的報表工具, 到分析人員大量的 Excel 製作, 到 BI 報表的大量使用
每一個階段,都代表著資料分析這一生產力的提升。
而當大資料盛行的時候,我們同樣也需要升級我們的的分析工具
使其可以跟得上時代的進步,跨入大資料分析領域的分析

那麼有沒有辦法讓分析人員在 Excel, BI 報表上的技術積累,得以擴充套件到大資料分析呢?

設想我們只要在excel, BI 報表上增加一條 JDBC 的連線
就能連到 Spark SQL 上,執行常用的 SQL 來抓取資料,分析大資料了
這就是 Spark SQL JDBC Server!

Thrift JDBC Server 就是一個伺服器程序,屬於 Spark SQL 的一部分
當我們把 Thrift JDBC Server 啟動起來的時候,預設是開啟了本地的10000埠

start-thriftserver.sh

此時就允許多個客戶端來訪問我們的 Spark SQL 了

通過向本地的10000埠傳送 SQL 請求,Spark SQL 會將這些 SQL 請求轉譯成 Spark SchemaRDD, DataFrame 的執行命令,通過叢集管理器(YARN) 傳送到各個執行節點上執行。直到最終結果的返回!

下面是個簡單的 Java 連線 Spark SQL JDBC Server 的例子。
在這個例子裡,我們沒有使用任何 Spark 的庫
完全是和 Spark 環境獨立開來的應用,卻還能訪問 Spark SQL 利用它的平行計算
所以即使我們的應用程式是一個網站,只要能連線上Spark SQL JDBC Server
就能使用Spark SQL

package SparkSQL;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class Employee{
public static void main(String args[]) {
    try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    Connection conn = null ;
    try {
conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/default","","");
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    System.out.println("Connected!");

    Statement stmt = null;
try {
stmt = conn.createStatement();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    String sql;
    sql = "SELECT count(*) as employeesCnt FROM employees";
    ResultSet rs = null;
try {
rs = stmt.executeQuery(sql);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    //STEP 5: Extract data from result set
    try {
while(rs.next()){
  //Retrieve by column name
  int cnt  = rs.getInt("employeesCnt");

  System.out.print("employeeCnt: " + cnt);

}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

這裡概念需要特別清楚:

a 以上Spark SQL 其中的 SQL 指的就是 Hive.
b 這個 Thrift JDBC Server 就是一個 Hive Server. 可以接收客戶端對於 Hive 的請求
c 當我們建立 Java 連線 spark SQL 的應用,就是去連線 Hive Server.

那麼 Spark SQL JDBC Server 的意義在哪裡? Hive 也可以開啟 Remote Access, 同樣也是使用 Thrift Server. Spark SQL 其實是一類特殊的 SQL . 它支援常規的 SQL 語言,只是這類語言在別的語言裡呼叫時,傳送到 Spark SQL JDBC Server 上之後,會被 Spark 內建的分散式引擎翻譯為 Spark 命令,分配到各個 Spark 執行節點上執行。而 Spark SQL 支援的分散式庫,就有 Hive. 因此 Hive 的 HQL 語言是被 Spark SQL 支援的(HiveContext)。而這裡的 Spark SQL JDBC Server 不僅僅是 Hive Server , 而是一層 Spark編譯層.