Spark擁有一個龐大的、不斷增長的社群,還有在企業環境中不可或缺的生態系統。這些生態系統提供了不同生產環境案例所需的許多功能。一般來說,Spark應用做的是機器學習演算法、日誌聚合分析或者商務智慧相關的運算,因為它在許多領域都有廣泛的應用,包括商務智慧、資料倉庫、推薦系統、反欺詐等。

本文會介紹Spark核心社群開發的生態系統庫,以及ML/MLlib及Spark Streaming的Spark庫的具體用法,對於企業的各種用例及框架也進行了說明。

資料倉庫

對任何業務來說,資料分析都是一個核心環節。對分析型的應用來說,資料倉庫系統就是其核心繫統。Spark有眾多的框架和生態系統,所以它能作為核心元件為企業環境提供資料倉庫功能,如圖1所示。

圖1  Spark可以用作資料倉庫核心元件

當然,與其他現有的工具相比,Spark提供的功能有較大不同。SQL是很多資料分析師、資料科學家和工程師使用的細粒度資料分析方法。Spark也可以用作資料倉庫框架,支援SQL處理,名為SparkSQL。

Spark核心已經整合到其他分散式檔案系統中,例如HDFS、S3。如果你的業務資料本來就儲存在這樣的系統中,很容易將現有業務流程轉移到Spark環境,因為你只需要在資料儲存系統上啟動Spark叢集即可。針對開發人員,Spark還提供了一個友好的API,可以用資料科學家們喜愛的Python和R來訪問它。這個功能存在很長一段時間了。如果你習慣使用這些語言,那麼選擇Spark作為自己的資料倉庫引擎還是很容易的。

你可以使用熟悉的介面在Spark上處理更大的資料集。SparkSQL特有的介面是DataFrame(資料幀),這是受R語言啟發而引入的。建議使用這個介面來訪問結構化資料。我們將在下一節詳細介紹DataFrame。先來看一個純SQL介面。Spark大致提供了三種類型的DW(資料倉庫)功能:SparkSQL、DataFrame及Hive On Spark。如前所述,儘管DataFrame一開始是使用SparkSQL來開發的,但它與機器學習管道的關聯度更高。我們將把它與ML / MLlib放到一起介紹。本節介紹SparkSQL和Hive on Spark,重點關注怎樣配置叢集。在嘗試Spark的這些SQL功能之前,需要下載帶Hive profile(配置)的預編譯包,或者用Hive profile去構建這個包。如果你要自己建立,可以使用如下命令:

$ build/mvn -Pyarn -Phive -Phive-thriftserver \

                  -PHadoop-2.6.0 -DHadoop.version=2.6.0 \

                  -DskipTests clean package

一定要安裝Zinc,它是一個長時執行的伺服器程式,用於sbt的增量編譯。如果你用的是OS X系統,可以用命令brew install zinc來安裝它。

在執行這條命令後,可以得到一個帶有Hive類的Spark二進位制包。你或許會發現能用-P配置及-DHadoop.version環境變數輕鬆選擇Hadoop版本。最好依據Hadoop叢集及Hive功能選擇你所需要的版本。換句話說,如果想在Hadoop 2.7.0上執行Hive 0.13,可以使用如下命令:

$ build/mvn -Pyarn -Phive -Phive-thriftserver \

                  -PHadoop-2.7.0 -DHadoop.version=2.7.0 \

                  -DskipTests clean package

Hive on Spark

Hive是用於管理分散式儲存(例如HDFS)中的大型資料集的資料倉庫軟體。Hive一開始被開發來作為生成Hadoop MapReduce資料處理任務的簡單介面。Hive有很長的歷史,差不多跟Hadoop一樣悠久。之後,一個更靈活、可靠的框架Tez被引入進來,它曾試圖取代MapReduce框架。Apache Tez是一個比MapReduce更復雜的通用執行引擎。由於Tez旨在成為通用的執行引擎,如果正確地建立了執行計劃,我們就能用它作為SQL執行引擎。從Hive 1.1開始,Hive也支援將Spark作為查詢執行引擎。這意味著Hive目前支援三個執行引擎:Hadoop MapReduce、Tez和Spark。雖然Hive還沒有全部完成,仍然在開發過程中(詳情及進度可以檢視Hive-7292),但是現在Hive能充分利用Spark的速度及可靠性。下面是在本地機器上使用Hive on Spark的步驟。

首先,需要啟動Spark叢集。請注意,你必須下載不包含Hive JAR包的Spark版本。為了從Spark二進位制包中排除Hive JAR包,輸入下面的命令:

$ ./make-distribution.sh --name Hadoop2-without-hive \

                                  --tgz -Pyarn -PHadoop-2.6 \

                                  -Pparquet-provided

用這個命令可以編譯你自己的不含Hive JAR的Spark二進位制包。但是在YARN上啟動Spark叢集最簡單的方法是使用Spark目錄下的ec2指令碼:

$ ./ec2/spark-ec2 -key-pair=<your key pair name> \

                          -identity-file=<your key pair path> \

                          --region=us-east-1 --zone=us-east-1a \

                          -hadoop-major-version=yarn \

                          launch hive-on-spark-cluster

關於如何使用spark-ec2指令碼,可參考Spark官方文件(https://Spark. apache.org/docs/latest/ec2-scripts.html)。這個指令碼是為了在EC2例項上更容易地啟動Spark叢集而寫的。要使用它的話,需要有AWS賬戶,以及從AWS控制檯獲得AWS金鑰對。詳情請閱讀上述官方文件。

幾分鐘後,你就有一個執行在YARN上的Spark叢集了。這個叢集預設不含Hive。你需要在此Spark叢集上安裝Hive包。可以將Hive下載到Spark master伺服器上,然後通過Hive CLI(命令列介面)來啟動:

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/hive/hive-1.1.1/

apache-hive-1.1.1-bin.tar.gz

$ tar zxvf apache-hive-1.1.1-bin-tar.gz

$ cd apache-hive-1.1.1-bin.tar.gz

$ bin/hive

hive> set Spark.home=/location/to/SparkHome;

hive> set hive.execution.engine=Spark;

當你試著按照上述過程使用Hive on Spark的時候,可能會遇到麻煩。因為有一些情況下,當你自己啟動Hadoop叢集的時候,Hadoop和Hive的版本之間會發生衝突。所以,應該考慮使用CDH及HDP這樣的發行版,它們包含Spark和Hive,而且所有元件之間的相容性與功能都是經過測試的,這是最便捷的途徑。但是這些系統還在不斷髮展,並且元件間會有比較複雜的依賴關係,因此有必要了解元件間的依賴關係。

機器學習

在大資料領域的下一代資料處理技術中,機器學習扮演了重要角色。當收集大量的資料時,對系統性能會有顯著影響。這意味著,收集大量的關於處理能力的資料,可以使一個機器學習模型更加出色。通過提供一種簡單而通用的分散式框架,Hadoop及其生態系統實現了基本的環境(用大資料做機器學習)。Spark進一步推動了這種趨勢。所以,在本章中我們要關注的是,對機器學習演算法的使用和建立流程的一些具體工作。當然,對機器學習而言,Spark還有很多地方有待完善。但它的記憶體處理(on-memory processing)體系結構很適合解決機器學習問題。本節我們的下一個案例將重點看一看Spark中的ML(機器學習)。對開發者來說,機器學習本身需要一定的數學背景及複雜的理論知識,乍一看並不是那麼容易。只有具備一些知識和先決條件,才能在Spark上高效地執行機器學習演算法。

一些主要的機器學習概念包括:

  • DataFrame框架:它使建立及操作現實中的結構化資料更簡單。這個框架提供了一個先進的介面,有了它,我們就不用關心每一種機器學習演算法及其優化機制之間的差異。由於這種固定的資料模式(data schema),DataFrame能根據資料優化自己的工作負載。

  • MLlib和ML:整合到Spark內的核心機器學習框架。這些框架從本質上來說是Spark外部的框架,但是由於它們由Spark的核心提交者(committer)團隊所維護,它們是完全相容的,並且可以經由Spark核心無縫使用。

  • 其他可用於Spark的外部機器學習框架:包括Mahout及Hivemall。它們都支援目前的Spark引擎。在一些MLlib及ML無法滿足的情況下,可以選擇這些外部庫。

外部的框架

Spark社群提供了大量的框架和庫。其規模及數量都還在不斷增加。在本節中,我們將介紹不包含在Spark 核心原始碼庫的各種外部框架。Spark試圖解決的問題涵蓋的面很廣,跨越了很多不同領域,使用這些框架能幫助降低初始開發成本,充分利用開發人員已有的知識。

  • Spark Package:要使用Spark庫,你首先必須瞭解的東西是Spark package。它有點像Spark的包管理器。當你給Spark叢集提交job時,你可以到存放Spark package的網站下載任何package。所有package都存放在這個站點。

  • XGBoost:XGBoost是一個專用於分散式框架的優化庫。這個框架由DMLC(Distributed Machine Learning Community,分散式機器學習社群)開發。顧名思義,在DMLC專案下有許多機器學習庫,它們在Hadoop和Spark等已有資源上具有高擴充套件性。XGBoost是基於Gradient Boosting(梯度提升)演算法的。

  • spark-jobserver:提交job的流程需要改進,因為對於非工程師來說,這項工作有點難。你需要理解如何用命令列或者其他UNIX命令去提交Spark job。Spark專案現在是使用CLI來提交job的。spark-jobserver提供了一個RESTful API來管理提交到Spark叢集的job。因此,這意味著可以在企業內部環境中將Spark作為一個服務啟動。

未來的工作

你可能對使用Spark服務比較感興趣。Spark已經提供了很多功能,例如SQL執行、流處理以及機器學習。Spark也有一個好用的介面,而且背後有強大的社群,開發者十分活躍,這也是人們對Spark寄予厚望的原因。下面我們將介紹一些當前正在進行中的Spark專案。

Spark目前使用的主要資料結構是RDD和DataFrame。RDD是一個原創的概念,而DataFrame是後來引入的。RDD相對靈活。你可以在RDD結構上執行許多型別的轉換與計算。然而,因為它太靈活了,所以很難對其執行進行優化。另一方面,DataFrame有一定的固定結構,能利用它來優化DataFrame資料集上的執行。但是,它不具備RDD的優點,主要是沒有RDD的靈活性。RDD與DataFrame的主要區別如表2所示。

表2  RDD與DataFrame的區別

與引數伺服器整合

在介紹引數伺服器的實現之前,有必要釐清分散式機器學習的相關概念,例如並行。引數伺服器的目標與已有資料庫是不同的,它們為大規模機器學習而開發。在大規模機器學習中有兩種並行型別:資料並行(data parallelism)及模型並行(model parallelism)。

資料並行

資料並行側重於把資料分發到叢集不同的計算資源上。通常,用於機器學習的訓練資料量非常龐大,僅僅單臺節點機器在記憶體中是無法儲存所有資料的,甚至在磁碟上也無法儲存全部的資料。這是一種SIMD(單指令多資料流)處理型別。包括Spark MLlib及ML在內的大多數分散式機器學習框架都實現了資料並行。雖然資料並行很簡單且易於實現,但是資料並行的收集任務(在前面的例子中,就是指計算平均值)會導致效能瓶頸,因為這個任務必須等待分佈在叢集中的其他並行任務完成後才能執行。

模型並行

模型並行與資料並行差別很大。不同的機器用相同的資料訓練。然而,一個模型分佈在多臺機器上。深度學習的模型往往很大,因為許多引數常常不是在一臺機器上的。模型並行就是將單個模型分為多個分片。一個節點維護一個模型分片。另一方面,每個訓練程序能非同步更新模型。框架必須對此進行管理以便於保持模型的一致性。實現這個過程的框架,特別是在機器學習領域,叫作“引數伺服器”(parameter server)。深度學習尤其要求實現模型並行,因為深度學習需要用到更多資料,而這意味著最終需要更多引數。

引數伺服器與Spark

如前所述,原始的引數伺服器是為模型並行處理而開發出來的。Spark MLlib的大部分演算法當前在處理資料時僅僅是資料並行,而不是模型並行。為了以一種通用的方式實現模型並行,人們研究和開發出更高效的引數伺服器架構。引數伺服器是在RAM(隨機訪問儲存)上存放以及更新分散式叢集中的模型的。而模型更新常常是分散式機器學習過程的瓶頸所在。SPARK-6932是一個用於研究引數伺服器潛在能力的ticket,也是對各種實現的比較。此外,Spark專案在嘗試基於這項研究去實現它自己的“引數伺服器”。已經有人提供了Spark上的引數伺服器,參見 https://github.com/chouqin/spark/tree/ps-on-Spark-1.3。

深度學習

深度學習因其高準確率及通用性,成為機器學習中最受關注的領域。這種演算法在2011—2012年期間出現,並超過了很多競爭對手。最開始,深度學習在音訊及影象識別方面取得了成功。此外,像機器翻譯之類的自然語言處理或者畫圖也能使用深度學習演算法來完成。深度學習是自1980年以來就開始被使用的一種神經網路。神經網路被看作能進行普適近似(universal approximation)的一種機器。換句話說,這種網路能模仿任何其他函式。深度學習可以看作是組合了許多神經網路的一種深度結構。

如前面提到的引數伺服器,與其他已有的機器學習演算法相比,深度學習需要大量引數及訓練資料。這也是我們介紹能在Spark上執行的深度學習框架的原因。要想在企業環境中穩定地進行深度學習的訓練,必須要有一個可靠而快速的分散式引擎。Spark被視為目前最適合執行深度學習演算法的平臺,是因為:

  • 基於記憶體的處理架構對於使用機器學習的迭代計算,特別是深度學習,十分適合。

  • Spark的幾個生態系統如MLlib及Tachyon對於開發深度學習模型很有用。

下面是一些Spark能用的深度學習框架。這些框架和深度學習一樣,都是比較新的庫。

  • H2O:H2O是用h2o.ai開發的具有可擴充套件性的機器學習框架,它不限於深度學習。H2O支援許多API(例如,R、Python、Scala和Java)。當然它是開源軟體,所以要研究它的程式碼及演算法也很容易。H2O框架支援所有常見的資料庫及檔案型別,可以輕鬆將模型匯出為各種型別的儲存。

  • deeplearning4j:deeplearning4j是由Skymind開發的,Skymind是一家致力於為企業進行商業化深度學習的公司。deeplearning4j框架是建立來在Hadoop及Spark上執行的。這個設計用於商業環境而不是許多深度學習框架及庫目前所大量應用的研究領域。

  • SparkNet:這是本文介紹的最新的庫。SparkNet由加州大學伯克利分校AMP實驗室於2015年11月釋出。而Spark最早就是由AMP實驗室開發的。因此,說SparkNet 是“執行在Spark上的官方機器學習庫”一點兒也不為過。此庫提供了讀取RDD的介面,以及相容深度學習框架Caffe(http://caffe.berkeleyvision.org/)的介面。SparkNet通過採用隨機梯度下降(Stochastic Gradient Descent)獲得了簡單的並行模式。SparkNet job能通過Spark-submit提交。

Spark在企業中的應用

在最後,我們想聊一聊遇到的一些企業實際用例。雖然有些內容屬於公司機密不便公開,但是我們想解釋清楚Spark能做什麼以及怎樣才能充分利用Spark。以下都是我們公司的實際用例。

用Spark及Kafka收集使用者活動日誌

收集使用者活動日誌能幫助提高推薦的準確性以及將公司策略的效果以視覺化形式呈現。Hadoop和Hive主要就用在這個領域。Hadoop是唯一能處理像活動日誌這樣的海量資料的平臺。藉助Hive介面,我們能互動式做一些分析。但是這個架構有三個缺點:

  • Hive做分析很耗時。

  • 實時收集日誌有難度。

  • 需要對每個服務日誌分別進行煩瑣的分析。

為了解決這些問題,這家公司考慮引進Apache Kafka及Spark。Kafka是用於大資料傳送的佇列系統(見圖3)。Kafka自己不處理或轉換資料,它使大量的資料從一個數據中心可靠地傳送到另一個數據中心成為可能。因此,它是構建大規模管道架構不可或缺的平臺。

圖3 Kafka和Spark Steaming的體系結構概覽

Kafka有一個叫作主題(topic)的單元,帶有偏移量及複製管理功能。通過topic及一組名為ConsumerGroup的讀取器,我們就能獲得不同型別的日誌單元。為了做實時處理,我們採用Spark的流處理模組Spark Streaming。嚴格來說,Spark Streaming是一個微批量框架。微批量框架將流分為小資料集,對這些小集合執行批量處理程序。因此就處理演算法而言,批處理跟微批量處理沒有什麼不同。這是我們採用Spark Streaming而不是Storm或者Samza之類的其他流式處理平臺的一個主要原因。我們能方便地把當前的邏輯轉換為Spark Streaming。由於引入了這個架構,我們能獲得如下結果:

  • 用Kafka管理資料的終結。Kafka自動刪除過期的不需要的資料。我們無須處理這些事情。

  • 使資料儲存到儲存(HBase)上的時間縮到最短。我們可以把這個時間從2小時縮短到10~20秒。

  • 由於將一些過程轉換為Spark Streaming,所以減少了視覺化的時間。我們能使這個時間從2小時縮減到5秒。

Spark Streaming很好用,因為它的API基本與Spark相同。因此,熟悉Scala的使用者會很習慣Spark Streaming,而且Spark Streaming也能非常容易地無縫用在Hadoop平臺(YARN)上,不到1個小時就能建立一個做Spark Streaming 的叢集。但需要注意的是,Spark Streaming與普通Spark job不一樣,它會長期佔用CPU及記憶體。為了在固定時間裡可靠地完成資料處理,做一些調優是必要的。如果用Spark Streaming不能非常快地做流式處理(秒級以下的處理),我們推薦你考慮其他流式處理平臺,比如Storm和Samza。

用Spark做實時推薦

機器學習需求最旺盛的領域就是推薦。你可以看到許多推薦案例,比如電子商務、廣告、線上預約服務等。我們用Spark Streaming和GraphX做了一個售賣商品的推薦系統。GraphX是用於分散式圖處理的庫。這個庫是在一個Spark專案下開發的。我們可以用一種稱為彈性分散式屬性圖(resilient distributed property graph)的RDD來擴充套件原始RDD。GraphX提供了對這個圖的基本操作,以及類似Pregel的API。

我們的推薦系統如下。首先從Twitter收集每個使用者的推文(tweet)資料。接著,用Spark Streaming做接下來的微批量處理,每5秒收集一次推文並進行處理。由於推文是用自然語言寫的(在本例中為日語),所以需要用形態分析(morphological analysis)把每個單詞分離開。在第二階段,我們用Kuromoji去做這個分離。為了與我們的商品資料庫建立關係,需要為Kuromoji建立使用者定義字典。這是獲取有意義的推薦最重要的一點(見圖4)。

圖4 Spark Streaming

在第三階段,我們根據每個單詞與商品的關係計算出一個分值。我們還必須調整使用者定義字典,使單詞與商品之間的相關性更好。特別地,我們刪除了非字母字元,並且增加特別的相關詞彙。在這個階段之後,我們就獲得一個從每條推文中收集到的詞的集合。但是這個集合中還有與我們的商品不相關的詞。因此在第四階段,我們用SVM過濾出與商品相關的詞語,以有監督學習方式(supervised learning)訓練SVM:標籤0表示不相關的推文;標籤1表示相關的推文。建立了有監督學習的資料後,就開始訓練模型。接著我們從原始資料提取出相關的推文。最後一步就是分析商品條目與單詞的相關度。如果聚類成功,就能推薦相同聚類中的另一個商品給使用者(見圖5)。

圖5 Spark Steaming分析單詞的相關性

雖然主要的麻煩之處在於建立使用者定義字典,但是關於Spark Streaming也有一些地方需要注意:

  • Map#filterKeys和Map#mapValues不可序列化——在Scala 2.10中不能使用這些transformation。由於Spark 1.1依賴於Scala 2.10,所以我們不能用這些函式。這個問題在Scala 2.11中已經解決。

  • DStream的輸出操作受限制——在目前的DStream.print、saveAsText Files、saveAasObjectFiles、saveAsHadoopFiles與foreachRDD中沒有太多的輸出操作。在其他方法中,什麼操作都會有副作用。例如,println在map函式上就沒有效果。這為除錯帶來了困難。

  • 無法在StreamContext中建立新的RDD——DStream是RDD的連續序列。我們能輕鬆分離或者轉換這個初始的RDD,但是在StreamContext中建立一個全新的RDD則很難。

在這個系統中,我們使用了Spark Streaming、GraphX及Spark MLlib。雖然也能用Solr作為搜尋引擎,但是Spark庫幾乎提供了所有功能。這是Spark最強的特性之一,其他框架則達不到同樣的效果。

Twitter Bots的實時分類

這可能是一種關於興趣愛好的專案。我們已經分析了遊戲角色的Twitter聊天機器人(Twitter Bot),並且可視化了Bot賬戶之間的關係。與前面例子類似,我們用Spark Streaming收集推文資料。遊戲角色的名字可能有不同的拼寫形式。因此我們用搜索引擎Solr轉換推文中獨特的名字。在這個例子中我們覺得Spark Streaming的主要優點是,它已經實現了機器學習演算法(MLlib)及圖演算法(GraphX)。因此我們能立即分析推文,不用準備其他庫或編寫演算法。但是我們缺少資料去顯示有意義的視覺化結果。除此之外,從每個推文內容中提取出有意義的特徵也不容易。這可能是由於當前我們手動搜尋Twitter賬戶,推文資料不足而導致的。具體來說,Spark Streaming是一個可擴充套件的系統,能處理海量資料集。我們認為應該利用好Spark的可擴充套件能力。

總結

本文解釋了Spark 核心社群開發的生態系統庫,介紹了ML/MLlib及Spark Streaming的Spark 庫的具體用法,對於企業的各種用例及框架也進行了介紹。希望對你的開發或日常的業務決策能有所幫助。Spark擁有靈活的架構,其社群也提供了大量生態系統框架,這一切使得Spark有廣泛的應用場景。我們能從spark-packages上註冊的包數量中看到Spark社群的活躍度。截至2017年2月3日,註冊的包的數量已達到319個。

圖6 發展中的Spark

Spark社群是一個欣欣向榮的開源社群,Spark社群在不遠的未來肯定會發生變化。


本文節選並整理自《Spark:大資料叢集計算的生產實踐》一書,Ilya Ganelin(伊利亞·甘列林)等著,李剛譯,周志湖審校。點選閱讀原文瞭解圖書詳情。