1. 程式人生 > >大資料(Spark-Spark Streaming的架構及原理)

大資料(Spark-Spark Streaming的架構及原理)

流式計算

資料的時效性

日常工作中,我們一般會先把資料儲存在一張表中,然後對這張表的資料進行加工、分析。既然資料要儲存在表中,就有時效性這個概念。
如果我們處理的是年級別的資料,比如人口分析、巨集觀經濟分析,那麼資料最新日期距今晚個一兩週、甚至一兩個月都沒什麼關係。
如果我們處理的是天級別的資料,比如各大網站的使用者偏好分析、零售供銷分析,一般晚個幾天也是可以的,即 T+N 更新。
如果是小時級別的資料,對時效性要求就更高了,比如金融風控,涉及到資金的安全,必須有一張小時級別的資料。
那麼還有沒有要求更高的?當然有了,比如風險監測,網站必須有實時監測系統,一旦有攻擊,就必須立刻採取措施,雙十一或者週年慶的時候,各大電商平臺都經歷著嚴峻的流量考驗,也必須對系統進行實時的監測。此外,網站的實時個性化推薦、搜尋引擎中也對實時性有極高的要求。
在這種場景下,傳統的資料處理流程——先收集資料,然後放到DB中,再取出來分析——就無法滿足這麼高的實時要求。
流式計算,在實時或者準實時的場景下,應運而生。

(1)與批量計算那樣慢慢積累資料不同,流式計算將大量資料平攤到每個時間點上,連續地進行小批量的進行傳輸,資料持續流動,計算完之後就丟棄。
(2) 批量計算是維護一張表,對錶進行實施各種計算邏輯。流式計算相反,是必須先定義好計算邏輯,提交到流式計算系統,這個計算作業邏輯在整個執行期間是不可更改的。
(3) 計算結果上,批量計算對全部資料進行計算後傳輸結果,流式計算是每次小批量計算後,結果可以立刻投遞到線上系統,做到實時化展現。

(1) 流式計算流程
① 提交流計算作業。
② 等待流式資料觸發流計算作業。
③ 計算結果持續不斷對外寫出。

(2) 流式計算特點
① 實時、低延遲
② 無界,資料是不斷無終止的
③ 連續,計算持續進行,計算完之後資料即丟棄

Apache Storm

在Storm中,先要設計一個用於實時計算的圖狀結構,我們稱之為拓撲(topology)。這個拓撲將會被提交給叢集,由叢集中的主控節點(master node)分發程式碼,將任務分配給工作節點(worker node)執行。一個拓撲中包括spout和bolt兩種角色,其中spout傳送訊息,負責將資料流以tuple元組的形式傳送出去;而bolt則負責轉換這些資料流,在bolt中可以完成計算、過濾等操作,bolt自身也可以隨機將資料傳送給其他bolt。由spout發射出的tuple是不可變陣列,對應著固定的鍵值對。

Apache Flink

Flink 是一個針對流資料和批資料的分散式處理引擎。它主要是由 Java 程式碼實現。對 Flink 而言,其所要處理的主要場景就是流資料,批資料只是流資料的一個極限特例而已。再換句話說,Flink 會把所有任務當成流來處理,這也是其最大的特點。Flink 可以支援本地的快速迭代,以及一些環形的迭代任務。並且 Flink 可以定製化記憶體管理。在這點,如果要對比 Flink 和 Spark 的話,Flink 並沒有將記憶體完全交給應用層。這也是為什麼 Spark 相對於 Flink,更容易出現 OOM 的原因(out of memory)。就框架本身與應用場景來說,Flink 更相似與 Storm。

Apache Spark Streaming

Spark Streaming是核心Spark API的一個擴充套件,它並不會像Storm那樣一次一個地處理資料流,而是在處理前按時間間隔預先將其切分為一段一段的批處理作業。Spark針對持續性資料流的抽象稱為DStream(DiscretizedStream),一個DStream是一個微批處理(micro-batching)的RDD(彈性分散式資料集);而RDD則是一種分散式資料集,能夠以兩種方式並行運作,分別是任意函式和滑動視窗資料的轉換。

Storm, Flink, Spark Streaming的對比圖

Storm, Flink, Spark Streaming的選擇

如果你想要的是一個允許增量計算的高速事件處理系統,Storm會是最佳選擇。

如果你必須有狀態的計算,恰好一次的遞送,並且不介意高延遲的話,那麼可以考慮Spark Streaming,特別如果你還計劃圖形操作、機器學習或者訪問SQL的話,Apache Spark的stack允許你將一些library與資料流相結合(Spark SQL,Mllib,GraphX),它們會提供便捷的一體化程式設計模型。尤其是資料流演算法(例如:K均值流媒體)允許Spark實時決策的促進。

Flink支援增量迭代,具有對迭代自動優化的功能,在迭代式資料處理上,比Spark更突出,Flink基於每個事件一行一行地流式處理,真正的流式計算,流式計算跟Storm效能差不多,支援毫秒級計算,而Spark則只能支援秒級計算。

Spark Streaming 簡介

Spark Streaming 是Spark 核心API的一個擴充套件,可以實現高吞吐量的、具備容錯機制的實時流資料的處理。支援多種資料來源獲取資料,包括Kafka、Flume、Zero MQ,Kinesis以及TCP Sockets,從資料來源獲取資料之後,可以使用諸如map、reduce、join和window等高階函式進行復雜演算法的處理。最後還可以將處理結果儲存到檔案系統,資料庫和現場儀表盤。

在”One Stack rule them all”的基礎上,可以使用Spark的其他子框架,如叢集學習、圖計算等,對流資料進行處理。

Spark的各個子框架都是基於Spark Core的,Spark Streaming在內部的處理機制是,接收實時流的資料,並根據一定的時間間隔拆分成一批批的資料,然後通過Spark Enging處理這些批資料,最終得到處理後的一批批結果資料。

對應的批資料,在Spark核心對應一個RDD例項,因此,對應流資料的DStream可以看成是一組RDDS,即RDD的一個序列。通俗點理解的話,在流資料分成一批一批後,通過一個先進先出的佇列,然後Spark Enging從該佇列中依次取出一個個批資料,把批資料封裝成一個個RDD,然後進行處理,這是一個典型的生產者/消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。

離散流(discretized stream)或DStream
這是SparkStraming對內部持續的實時資料流的抽象描述,即我們處理的一個實時資料流,在Spark Streaming中對應於一個DStream例項。

批資料(batch data)
這是化整為零的第一步,將實時流資料以時間片為單位進行分批,將流處理轉化為時間片資料的批處理。隨著持續時間的推移,這些處理結果就形成了對應的結果資料流了。

時間片或批處理時間間隔(batch interval)
這是人為地對資料流進行定量的標準,以時間片作為我們拆分資料流的依據。一個時間片的資料對應一個RDD例項。

視窗長度(window length)
一個視窗覆蓋的流資料的時間長度。必須是批處理時間間隔的倍數。

滑動時間間隔
前一個視窗到後一個視窗所經過的時間長度。必須是批處理時間間隔的倍數。

Input DStream
一個input DStream是一個特殊的DStream,將Spark Streaming連線到一個外部資料來源來讀取資料。

 

Spark Streaming 架構

在Spark Streaming中,資料處理是按批進行的,而資料採集是逐條進行的,因此在Spark Streaming中會事先設定好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的資料彙總起來稱為一批資料交個系統區處理。

對於視窗操作而言,在其視窗內部會有N個批處理資料,批處理資料的大小由視窗間隔(window duration)決定,而視窗間隔指的就是視窗的持續時間,在視窗操作中,只有視窗的長度滿足了才會觸發批處理的處理。除了視窗的長度,視窗操作還有另一個重要的引數就是滑動間隔(slide duration),它指的是經過多長時間視窗滑動一次形成新的視窗,滑動視窗預設情況下和批次間隔的相同,而視窗間隔一般設定的要比它們兩個大。在這裡必須注意的一點是滑動間隔和視窗間隔的大小一定得設定為批處理間隔的整數倍。

Spark Streaming是一個對實時資料流進行高通量、容錯處理的流式處理系統,可以對多種資料來源(如Kafka、Flume、Zero MQ和TCP套接字)進行類似Map、Reduce和Join等複雜操作,並將結果儲存到外部檔案系統、資料庫或應用到實時儀表盤。

計算流程

Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distrbute Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者儲存到外部裝置。

容錯性

對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯性機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。

對於Spark Streaming來說,其RDD的傳承關係如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size鎖產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自磁碟,例如HDFS(多份拷貝)或是來自與網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

實時性

對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5 ~ 2秒之間(Stom目前最小的延遲在100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

擴充套件性與吞吐量

Spark目前在EC2上已經能夠線性擴充套件到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的資料量(60M records/s),其吞吐量也比流行的Storm高2~5倍,以下是Berkeley利用WordCount和Grep兩個用例所做的測試。

 

Spark Streaming 持久化

與RDD一樣,DStream同樣也能通過persist()方法將資料流存放在記憶體中,預設的持久化方法是MEMORY_ONLY_SER,也就是在記憶體中存放資料同時序列化的方式,這樣做的好處是遇到需要多次迭代計算的程式時,速度優勢十分的明顯。而對於一些基於視窗的操作,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,如updateStateByKey,其預設的持久化策略就是儲存在記憶體中。

對於來自網路的資料來源(Kafka、Flume、sockets等),預設的持久化策略是將資料儲存在兩臺機器上,這也是為了容錯性而設計的。

另外,對於視窗和有狀態的操作必須checkpont,通過StreamingContext的checkpoint來指定目錄,通過DStream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。

Spark Streaming 效能優化

1,優化執行時間

增加並行度 
確保使用整個叢集的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分的使用叢集資源。

減少資料序列化,反序列化的負擔 
Spark Streaming預設將接受到的資料序列化後儲存,以減少記憶體的使用。但是序列化和反序列化需要更多的CPU時間,因此更加高效的序列化方式和自定義的序列化介面以更高效的使用CPU。

設定合理的batch duration(批處理時間)
在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的作業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此設定一個合理的批處理間隔以確保作業能夠在這個批處理間隔內結束是必須的。

2,優化記憶體使用
控制batch size(批處理間隔內的資料量)
Spark Streaming會把批處理間隔內接收到的所有資料存放在Spark內部的可用記憶體區域中,因此必須確保當前節點Spark的可用記憶體中至少能容納這個批處理時間間隔內的所有資料,否則必須增加新的資源以提高叢集的處理能力。

及時清理不再使用的資料 
前面講到Spark Streaming會將接受的資料應及時清理,以確保Spark Streaming有富餘的可用記憶體空間。通過設定合理的spark.cleaner.ttl時長來及時清理超時的無用資料,這個引數需要小心設定以免後續操作中所需要的資料被超時錯誤處理。

觀察及適當調整GC策略 
GC會影響Job的正常執行,可能延長Job的執行時間,引起一系列不可預料的問題。觀察GC的執行情況,採用不同的GC策略以進一步減小記憶體回收對Job執行的影響。