1. 程式人生 > >大資料分析技術與實戰之 Spark Streaming

大資料分析技術與實戰之 Spark Streaming

Spark是基於記憶體的大資料綜合處理引擎,具有優秀的作業排程機制和快速的分散式計算能力,使其能夠更加高效地進行迭代計算,因此Spark能夠在一定程度上實現大資料的流式處理。

隨著資訊科技的迅猛發展,資料量呈現出爆炸式增長趨勢,資料的種類與變化速度也遠遠超出人們的想象,因此人們對大資料處理提出了更高的要求,越來越多的領域迫切需要大資料技術來解決領域內的關鍵問題。在一些特定的領域中(例如金融、災害預警等),時間就是金錢、時間可能就是生命!然而傳統的批處理框架卻一直難以滿足這些領域中的實時性需求。為此,湧現出了一批如S4、Storm的流式計算框架。Spark是基於記憶體的大資料綜合處理引擎,具有優秀的作業排程機制和快速的分散式計算能力,使其能夠更加高效地進行迭代計算,因此Spark能夠在一定程度上實現

大資料的流式處理

Spark Streaming是Spark上的一個流式處理框架,可以面向海量資料實現高吞吐量、高容錯的實時計算。Spark Streaming支援多種型別資料來源,包括Kafka、Flume、trwitter、zeroMQ、Kinesis以及TCP sockets等,如圖1所示。Spark Streaming實時接收資料流,並按照一定的時間間隔將連續的資料流拆分成一批批離散的資料集;然後應用諸如map、reducluce、join和window等豐富的API進行復雜的資料處理;最後提交給Spark引擎進行運算,得到批量結果資料,因此其也被稱為準實時處理系統。

 

 

圖1 Spark Streaming支援多種型別資料來源

目前應用最廣泛的大資料流式處理框架是Storm。Spark Streaming 最低0.5~2s做一次處理(而Storm最快可達0.1s),在實時性和容錯方面不如Storm。然而Spark Streaming的整合性非常好,通過RDD不僅能夠與Spark上的所有元件無縫銜接共享資料,還能非常容易地與Kafka、Flume等分散式日誌收集框架進行整合;同時Spark Streaming的吞吐量非常高,遠遠優於Storm的吞吐量,如圖2所示。所以雖然Spark Streaming的處理延遲高於Storm,但是在整合性與吞吐量方面的優勢使其更適用於大資料背景。

 

 

圖2 Spark Streaming與Storm吞吐量比較圖

Spark Streaming基礎概念

批處理時間間隔

在Spark Streaming中,對資料的採集是實時、逐條進行的,但是對資料的處理卻是分批進行的。因此,Spark Streaming需要設定一個時間間隔,將該時間間隔內採集到的資料統一進行處理,這個間隔稱為批處理時間間隔。

也就是說對於源源不斷的資料,Spark Streaming是通過切分的方式,先將連續的資料流進行離散化處理。資料流每被切分一次,對應生成一個RDD,每個RDD都包含了一個時間間隔內所獲取到的所有資料,因此資料流被轉換為由若干個RDD構成的有序集合,而批處理時間間隔決定了Spark Streaming需要多久對資料流切分一次。Spark Streaming是Spark上的元件,其獲取的資料和資料上的操作最終仍以Spark作業的形式在底層的Spark核心中進行計算,因此批處理時間間隔不僅影響資料處理的吞吐量,同時也決定了Spark Streaming向Spark提交作業的頻率和資料處理的延遲。需要注意的是,批處理時間間隔的設定會伴隨Spark Streaming應用程式的整個生命週期,無法在程式執行期間動態修改,所以需要綜合考慮實際應用場景中的資料流特點和叢集的處理效能等多種因素進行設定。

視窗時間間隔

視窗時間間隔又稱為視窗長度,它是一個抽象的時間概念,決定了Spark Streaming對RDD序列進行處理的範圍與粒度,即使用者可以通過設定視窗長度來對一定時間範圍內的資料進行統計和分析。如果設批處理時間設為1s,視窗時間間隔為3s,如3圖所示,其中每個實心矩形表示Spark Streaming每1秒鐘切分出的一個RDD,若干個實心矩形塊表示一個以時間為序的RDD序列,而透明矩形框表示視窗時間間隔。易知視窗內RDD的數量最多為3個,即Spark Streming 每次最多對3個RDD中的資料進行統計和分析。對於視窗時間間隔還需要注意以下幾點:

以圖3為例,在系統啟動後的前3s內,因進入視窗的RDD不足3個,但是隨著時間的推移,最終視窗將被填滿。

不同視窗內所包含的RDD可能會有重疊,即當前視窗內的資料可能被其後續若干個視窗所包含,因此在一些應用場景中,對於已經處理過的資料不能立即刪除,以備後續計算使用。

視窗時間間隔必須是批處理時間間隔的整數倍。

 

 

圖3 視窗時間間隔示意圖

滑動時間間隔

滑動時間間隔決定了Spark Streaming對資料進行統計與分析的頻率,多出現在與視窗相關的操作中。滑動時間間隔是基於批處理時間間隔提出的,其必須是批處理時間間隔的整數倍。在預設的情況下滑動時間間隔設定為與批處理時間間隔相同的值。如果批處理時間間隔為1s,視窗間隔為3s,滑動時間間隔為2s,如圖4所示,其含義是每隔2s對過去3s內產生的3個RDD進行統計分析。

 

 

圖4 滑動時間間隔、視窗時間間隔、批處理時間間隔綜合示意圖

DStream基本概念

DStream是Spark Streaming的一個基本抽象,它以離散化的RDD序列的形式近似描述了連續的資料流。DStream本質上是一個以時間為鍵,RDD為值的雜湊表,儲存了按時間順序產生的RDD,而每個RDD封裝了批處理時間間隔內獲取到的資料。Spark Streaming每次將新產生的RDD新增到雜湊表中,而對於已經不再需要的RDD則會從這個雜湊表中刪除,所以DStream也可以簡單地理解為以時間為鍵的RDD的動態序列。設批處理時間間隔為1s,圖5為4s內產生的DStream示意圖。

 

 

圖5 DStream示意圖

Spark Streaming程式設計模式與案例分析

Spark Streaming程式設計模式

下面以Spark Streaming官方提供的WordCount程式碼為例來介紹Spark Streaming的使用方式。

示例1:

 

 

Spark Streaming應用程式在功能結構上通常包含以下五部分,如上述示例1所示。

匯入Spark Streaming相關包:Spark Streaming作為Spark框架上的一個元件,具有很好的整合性。在開發Spark Streaming應用程式時,只需匯入Spark Streaming相關包,無需額外的引數配置。

建立StreamingContext物件:同Spark應用程式中的SparkContext物件一樣, StreamingContext物件是Spark Streaming應用程式與叢集進行互動的唯一通道,其中封裝了Spark叢集的環境資訊和應用程式的一些屬性資訊。在該物件中通常需要指明應用程式的執行模式(示例1中設為local[2])、設定應用程式名稱(示例1中設為NetworkWordCount)、設定批處理時間間隔(示例1中設為Seconds(1)即1秒鐘),其中批處理時間間隔需要根據使用者的需求和叢集的處理能力進行適當地設定。

建立InputDStream:Spark Streaming需要根據資料來源型別選擇相應的建立DStream的方法。示例1中Spark Streaming通過StreamingContext物件呼叫socketTextStream方法處理以socket連線型別資料來源,創建出DStream即lines。Spark Streaming同時支援多種不同的資料來源型別,其中包括Kafka、Flume、HDFS/S3、Kinesis和Twitter等資料來源。

操作DStream:對於從資料來源得到的DStream,使用者可以呼叫豐富的操作對其進行處理。示例1中針對lines的一系列操作就是一個典型的WordCount執行流程:對於當前批處理時間間隔內的文字資料以空格進行切分,進而得到words;再將words中每個單詞轉換為二元組,進而得到pairs;最後利用reduceByKey方法進行統計。

啟動與停止Spark Streaming應用程式:在啟動Spark Streaming應用程式之前,DStream上所有的操作僅僅是定義了資料的處理流程,程式並沒有真正連線上資料來源,也沒有對資料進行任何操作,當ssc.start()啟動後程序中定義的操作才會真正開始執行。

文字檔案資料處理案例

功能需求

實時監聽並獲取本地home/dong/Streamingtext目錄中新生成的檔案(檔案均為英文文字檔案,單詞之間使用空格進行間隔),並對檔案中各單詞出現的次數進行統計。

程式碼實現

 

 

執行演示

第1步,啟動Hadoop與Spark。

 

 

第2步,建立Streaming監控目錄。

 

 

在dong使用者主目錄下建立Streamingtext為Spark Streaming監控的目錄,建立後如圖6所示。

 

 

圖6 dong使用者主目錄下建立Streamingtext資料夾

第3步,在IntelliJ IDEA中編輯執行Streaming程式。在IntelliJ IDEA中建立工程StreamingFileWordCount,編輯物件StreamingFileWordCount,如圖7所示。

 

 

圖7 IntelliJ IDEA中StreamingFileWordCount示意圖

由於該示例沒有輸入引數,因此不需要配置引數,可直接單擊右鍵->單擊"Run‘StreamingFileWordCount’ "。

第4步,在監聽目錄下建立文字檔案。在master節點上的/home/dong/Streamingtext中分別建立file1.txt與file2.txt。

file1.txt內容如下:

 

 

file2.txt內容如下:

 

 

建立後,/home/dong/Streamingtext中內容如圖8所示。

 

 

圖8 Streamingtext資料夾內容示意圖

檢視結果

終端視窗輸出了每個批處理時間間隔(20秒)內,/home/dong/Streamingtext中新生成檔案所包含的各單詞個數,如圖9所示。

 

 

圖9 StreamingFileWordCount執行結果示意圖

網路資料處理案例

功能需求

監聽本地節點指定埠傳輸的資料流(本案例為master節點9999埠的英文文字資料,以逗號間隔單詞),每5秒統計一次該時間間隔內收集到的各單詞的個數。

程式碼實現

本案例涉及資料流模擬器和分析器兩部分。為了更接近真實的網路環境,首先定義資料流模擬器,該模擬器以Socket方式監聽網路中指定節點上的指定埠號(master節點9999埠),當外部程式通過該埠連線並請求資料時,資料流模擬器將定時地從指定文字檔案中隨機選取資料傳送至指定埠(每間隔1秒鐘資料流模擬器從master節點上的/home/dong/Streamingtext/file1.txt中隨機擷取一行文字傳送給master節點的9999埠),通過這種方式模擬網路環境下源源不斷的資料流。針對獲取到的實時資料,再定義分析器(Spark Streaming應用程式),用以統計時間間隔(5秒)內收集到的單詞個數。

資料流模擬器程式碼實現如下:

 

 

分析器程式碼如下:

 

 

執行演示

第1步,在IntelliJ IDEA中編輯執行Streaming程式。master節點啟動IntelliJ IDEA,建立工程NetworkWordCount,編輯模擬器與分析器。模擬器如圖10所示,分析器如圖11所示。

 

 

圖10 IntelliJ IDEA中資料流模擬器示意圖

 

 

圖11 IntelliJ IDEA中分析器示意圖

第2步,建立模擬器資料來源檔案。在master節點建立/home/dong/Streamingtext目錄,在其中建立文字檔案file1.txt。

file1.txt內容如下:

 

 

第3步,打包資料流模擬器。打包過程詳見本書4.3.3節。在Artifacts打包配置介面中,根據使用者實際scala安裝目錄,在Class Path中新增下述scala依賴包,如圖12所示。

 

 

 

 

圖12 在Class Path中新增scala依賴包

打包後在主目錄下生成NetworkWordCount.jar,如圖13所示。

圖13 在dong使用者主目錄下生成NetworkWordCount.jar示意圖

第4步,啟動資料流模擬器。在master節點開啟控制終端,通過下面程式碼啟動資料流模擬器。

 

 

資料流模擬器每間隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機擷取一行文字傳送給master節點的9999埠。在分析器未連線時,資料流模擬器處於阻塞狀態,終端不會顯示輸出的文字。

第5步,執行分析器。在master上啟動IntelliJ IDEA編寫分析器程式碼,然後單擊選單"Build->"Build Artifacts",通過Application選項配置分析器執行所需的引數,其中Socket主機名為master、埠號為9999,引數之間用空格間隔,如圖13所示。

 

 

圖13 分析器引數配置示意圖

配置好引數後返回IntelliJ IDEA選單欄,單擊"Run"->"Build Artifacts"執行分析器。

檢視結果

第1步,在master上檢視資料流模擬器執行情況。IntelliJ IDEA執行分析器從而與資料流模擬器建立連線。當檢測到外部連線時,資料流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機擷取一行文字傳送給master節點上的9999埠。為方便講解和說明,file1.txt中每一行只包含一個單詞,因此資料流模擬器每次僅傳送一個單詞給埠,如圖14所示。

 

 

圖14 在master上模擬器執行結果

第2步,在master的IntelliJ IDEA中檢視分析器執行情況。在IntelliJ IDEA的執行日誌視窗中,可以觀察到統計結果。通過分析可知Spark Streaming每個批處理時間間隔內獲取的單詞數為5,剛好是5秒內傳送單詞的總數,並對各單詞進行了統計,如圖15所示。

 

 

圖15 IntelliJ IDEA中分析器執行結果

stateful應用案例

在很多資料流相關的實際應用場景中,對當前資料的統計分析需要藉助於先前的資料處理結果完成。例如電商每間隔10分鐘統計某一商品當前累計銷售總額、車站每隔3小時統計當前客流總量,等等。此類應用需求可藉助於Spark Streaming的有狀態轉換操作實現。

功能需求

監聽網路中某節點上指定埠傳輸的資料流(slave1節點9999埠的英文文字資料,以逗號間隔單詞),每5秒分別統計各單詞的累計出現次數。

程式碼實現

本案例功能的實現涉及資料流模擬器和分析器兩部分。

分析器程式碼:

 

 

執行演示

第1步,slave1節點啟動資料流模擬器。

第2步,打包分析器。master節點啟動IntelliJ IDEA建立工程StatefulWordCount編輯分析器,如圖16所示,並將分析器直接打包至master節點dong使用者的主目錄下,如圖17所示。

 

 

圖16 IntelliJ IDEA中StatefulWordCount示意圖

 

 

圖17 master上的StatefulWordCount.jar示意圖

第3步,執行分析器。在master節點開啟終端,通過下面程式碼向Spark叢集提交應用程式。

 

 

檢視結果

第1步,檢視slave1上資料流模擬器執行情況。分析器在叢集上提交執行後與slave1上執行的資料流模擬器建立連線。當檢測到外部連線時,資料流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機擷取一行文字傳送給slave1節點上的9999埠。由於該文字檔案中每一行只包含一個單詞,因此每秒僅傳送一個單詞給埠。如圖18所示。

 

 

圖18 slave1上資料流模擬器執行示意圖

第2步,檢視master上分析器執行情況。在master節點的提交視窗中可以檢視到統計結果,如圖19所示。

 

 

圖19 master上分析器執行示意圖

圖中表明截至147920770500ms分析器共接收到14個單詞,其中"spark"累計出現3次,"hbase"累計出現5次,"hello"累計出現3次,"world"累計出現3次。由於批處理時間間隔是5s,模擬器每1秒傳送1個單詞,使得分析器在5s內共接收到5個單詞,因此截止至147920771000ms,分析器共收到19個單詞,其中"spark"累計出現5次,"hbase"累計出現7次,"hello"累計出現4次,"world"累計出現3次。

第3步,檢視HDFS中持久化目錄。執行後檢視HDFS上的持久化目錄/user/dong/input/StatefulWordCountlog,如圖20所示。Streaming應用程式將接收到的網路資料持久化至該目錄下,便於容錯處理。

 

 

圖20 HDFS上持久化目錄示意圖

window應用案例

在實際生產環境中,與視窗相關的應用場景很常見,例如電商每間隔10分鐘小時統計某一商品前30分鐘內累計銷售總額、車站每隔1小時統計前3個小時內的客流量等,此類需求可藉助Spark Streaming中的window相關操作實現。window應用案例同時涉及批處理時間間隔、視窗時間間隔與滑動時間間隔。

功能需求

監聽網路中某節點上指定埠傳輸的資料流(slave1節點上9999埠的英文文字資料,以逗號間隔單詞),每10秒統計前30秒各單詞累計出現的次數。

程式碼實現

本例功能的實現涉及資料流模擬器和分析器兩部分。

分析器程式碼:

 

 

執行演示

第1步,slave1節點啟動資料流模擬器。

第2步,打包分析器。在master節點啟動IntelliJ IDEA建立工程WindowWordCount編輯分析器,如圖21,並將分析器直接打包至master節點dong使用者的主目錄下,如圖22所示。

 

 

圖21 IntelliJ IDEA中WindowWordCount示意圖

 

 

圖22 master上WindowWordCount.jar示意圖

第3步,執行分析器。在master節點開啟終端,通過下面程式碼向Spark叢集提交應用程式。

 

 

檢視結果

第1步 在slave1上檢視資料流模擬器執行情況。分析器在叢集上提交執行後與slave1上執行的資料流模擬器建立連線。當檢測到外部連線時,資料流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機擷取一行文字傳送給slave1節點的9999埠。由於該文字檔案中每一行只包含一個單詞和一個逗號,因此每秒僅傳送一個單詞和一個逗號給埠,如圖23所示。

 

 

圖23 在slave1上資料流模擬器執行示意圖

第2步,在master上檢視分析器執行情況。在master節點的提交視窗中可以檢視到統計結果。在WindowWordCount應用程式啟動初期,視窗並沒有被接收到的單詞填滿,但隨著時間的推進,每個視窗中的單詞數目最終固定為30個。圖7.35只是截取了執行結果中的三個批次。由於設定了視窗時間間隔是30s,滑動時間間隔是10s,且資料流模擬器每間隔1s傳送一個單詞,因此WindowWordCount每間隔10s對過去30s內收到的各單詞個數進行統計。圖24中截至1479276925000ms分析器對過去30s內收到的30個單詞進行統計,其中"spark"累計出現5次,"hbase"累計出現8次,"hello"累計出現9次,"world"累計出現8次。再間隔10s,截至1479276935000ms,分析器對過去30s內收到的30個單詞進行統計,其中"spark"累計出現8次,"hbase"累計出現9次,"hello"累計出現7次,"world"累計出現6次。

 

 

圖24 在master上分析器執行示意圖

第3步,檢視持久化資料。執行後檢視HDFS上的持久化目錄/user/dong/input/WindowWordCountlog,如圖25所示。Streaming應用程式將接收到的網路資料持久化至該目錄下,便於容錯處理。

 

 

圖25 HDFS上持久化目錄示意圖

效能考量

在開發Spark Streaming應用程式時,要結合叢集中各節點的配置情況儘可能地提高資料處理的實時性。在調優的過程中,一方面要儘可能利用叢集資源來減少每個批處理的時間;另一方面要確保接收到的資料能及時處理掉。

執行時間優化

設定合理的批處理時間和視窗大小

Spark Streaming中作業之間通常存在依賴關係,後面的作業必須確保前面的作業執行結束後才能提交,若前面的作業的執行時間超過了設定的批處理時間間隔,那麼後續的作業將無法按時提交執行,造成作業的堵塞。也就是說若想Spark Streaming應用程式穩定地在叢集中執行,對於接收到的資料必須儘快處理掉。例如若設定批處理時間為1秒鐘,那麼系統每1秒鐘生成一個RDD,如果系統計算一個RDD的時間大於1秒,那麼當前的RDD還沒來得及處理,後續的RDD已經提交上來在等待處理了,這就產生了堵塞。因此需要設定一個合理的批處理時間間隔以確保作業能夠在這個批處理時間間隔時間內結束。許多實驗資料表明,500毫秒對大多Spark Streaming應用而言是較好的批處理時間間隔。

類似地,對於視窗操作,滑動時間間隔對於效能也有很大的影響。當單批次資料計算代價過高時,可以考慮適當增大滑動時間間隔。

對於批處理時間和視窗大小的設定,並沒有統一的標準。通常是先從一個比較大的批處理時間(10秒左右)開始,然後不斷地使用更小的值進行對比測試。如果Spark Streaming使用者介面中顯示的處理時間保持不變,則可以進一步設定更小的值;如果處理時間開始增加,則可能已經達到了應用的極限,再減小該值則可能會影響系統的效能。

提高並行度

提高並行度也是一種減少批處理所消耗時間的常見方法。有以下三種方式可以提高並行度。一種方法是增加接收器數目。如果獲取的資料太多,則可能導致單個節點來不及對資料進行讀入與分發,使得接收器成為系統瓶頸。這時可以通過建立多個輸入DStream來增加接收器數目,然後再使用union來把資料合併為一個數據源。第二種方法是將收到的資料顯式地重新分割槽。如果接收器數目無法再增加,可以通過使用DStream.repartition、spark.streaming.blocklnterval等引數顯式地對Dstream進行重新分割槽。第三種方法是提高聚合計算的並行度。對於會導致shuffle的操作,例如reduceByKey、reduceByKeyAndWindow等操作,可通過顯示設定更高的行度引數確保更為充分地使用叢集資源。

記憶體使用與垃圾回收

控制批處理時間間隔內的資料量

Spark Streaming會把批處理時間間隔內獲取到的所有資料存放在Spark內部可用的記憶體中。因此必須確保在當前節點上SparkStreaming可用的記憶體容量至少能容下一個批處理時間間隔內所有的資料。比如一個批處理時間間隔是1秒,但是1秒產生了1GB的資料,那麼要確保當前的節點上至少有可供SparkStreaming使用的1GB記憶體。

及時清理不再使用的資料

對於記憶體中處理過的、不再需要的資料應及時清理,以確保Spark Streaming能夠擁有足夠的記憶體空間可以使用。一種方法是可以通過設定合理的spark.cleaner.ttl時長來及時清理超時的無用資料,但該方法應慎重使用,以免後續資料在需要時被錯誤清理。另一種方法是將spark.streaming.unpersist設定為true,系統將自動清理已經不需要的RDD。該方法能顯著減少RDD對記憶體的需要,同時潛在地提高GC的效能。此外使用者還可以通過配置引數streamingContext.remember為資料設定更長的保留時間。

減少序列化與反序列化的負擔

SparkStreaming預設將接收到的資料序列化後放入記憶體,以減少記憶體使用。序列化和反序列化需要更多的CPU資源,因此使用適當的序列化工具(例如Kryo)和自定義的序列化介面可以更高效地使用CPU。除了使用更好的序列化工具外還可以結合壓縮機制,通過配置spark.rdd.compress,以CPU的時間開銷來換取記憶體資源,降低GC開銷。

大家喜歡多多關注,你的關注是我最大的動力。