1. 程式人生 > >Spark學習——Spark Streaming:大規模流式資料處理

Spark學習——Spark Streaming:大規模流式資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。 

  • 複雜的批量資料處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間。
  • 基於歷史資料的互動式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間。
  • 基於實時資料流的資料處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。 
目前已有很多相對成熟的開源軟體來處理以上三種情景,我們可以利用MapReduce來進行批量資料處理,可以用Impala來進行互動式查詢,對於流式資料處理,我們可以採用Storm。對於大多數網際網路公司來說,一般都會同時遇到以上三種情景,那麼在使用的過程中這些公司可能會遇到如下的不便。 

  • 三種情景的輸入輸出資料無法無縫共享,需要進行格式相互轉換。
  • 每一個開源軟體都需要一個開發和維護團隊,提高了成本。
  • 在同一個叢集中對各個系統協調資源分配比較困難。 
BDAS就是以Spark為基礎的一套軟體棧,利用基於記憶體的通用計算模型將以上三種情景一網打盡,同時支援Batch、Interactive、Streaming的處理,且相容支援HDFS和S3等分散式檔案系統,可以部署在YARN和Mesos等流行的叢集資源管理器之上。BDAS的構架如圖1所示,其中Spark可以替代MapReduce進行批處理,利用其基於記憶體的特點,特別擅長迭代式和互動式資料處理;Shark處理大規模資料的SQL查詢,相容Hive的HQL。本文要重點介紹的Spark Streaming,在整個BDAS中進行大規模流式處理。

 

圖1 BDAS軟體棧 

Spark Streaming構架 

  • 計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行疊加,或者儲存到外部裝置。圖2顯示了Spark Streaming的整個流程。

 

圖2 Spark Streaming構架圖


  • 容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。

圖3 Spark Streaming中RDD的lineage關係圖 

  • 對於Spark Streaming來說,其RDD的傳承關係如圖3所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性。所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。 
  • 實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解,以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

  • 擴充套件性與吞吐量:Spark目前在EC2上已能夠線性擴充套件到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的資料量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

圖4 Spark Streaming與Storm吞吐量比較圖

Spark Streaming的程式設計模型 

Spark Streaming的程式設計和Spark的程式設計如出一轍,對於程式設計的理解也非常類似。對於Spark來說,程式設計就是對於RDD的操作;而對於Spark Streaming來說,就是對DStream的操作。下面將通過一個大家熟悉的WordCount的例子來說明Spark Streaming中的輸入操作、轉換操作和輸出操作。 

  • Spark Streaming初始化:在開始進行DStream操作之前,需要對Spark Streaming進行初始化生成StreamingContext。引數中比較重要的是第一個和第三個,第一個引數是指定Spark Streaming執行的叢集地址,而第三個引數是指定Spark Streaming執行時的batch視窗大小。在這個例子中就是將1秒鐘的輸入資料進行一次Spark Job處理。
val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars]) 
  •  Spark Streaming的輸入操作:目前Spark Streaming已支援了豐富的輸入介面,大致分為兩類:一類是磁碟輸入,如以batch size作為時間間隔監控HDFS檔案系統的某個目錄,將目錄中內容的變化作為Spark Streaming的輸入;另一類就是網路流的方式,目前支援Kafka、Flume、Twitter和TCP socket。在WordCount例子中,假定通過網路socket作為輸入流,監聽某個特定的埠,最後得出輸入DStream(lines)。
val lines = ssc.socketTextStream(“localhost”,8888)
  • Spark Streaming的轉換操作:與Spark RDD的操作極為類似,Spark Streaming也就是通過轉換操作將一個或多個DStream轉換成新的DStream。常用的操作包括map、filter、flatmap和join,以及需要進行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我們首先需要將DStream(lines)切分成單詞,然後將相同單詞的數量進行疊加, 最終得到的wordCounts就是每一個batch size的(單詞,數量)中間結果。 
val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)


另外,Spark Streaming有特定的視窗操作,視窗操作涉及兩個引數:一個是滑動視窗的寬度(Window Duration);另一個是視窗滑動的頻率(Slide Duration),這兩個引數必須是batch size的倍數。例如以過去5秒鐘為一個輸入視窗,每1秒統計一下WordCount,那麼我們會將過去5秒鐘的每一秒鐘的WordCount都進行統計,然後進行疊加,得出這個視窗中的單詞統計。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))


但上面這種方式還不夠高效。如果我們以增量的方式來計算就更加高效,例如,計算t+4秒這個時刻過去5秒視窗的WordCount,那麼我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量(如圖5所示),這種方法可以複用中間三秒的統計量,提高統計的效率。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))



圖5 Spark Streaming中滑動視窗的疊加處理和增量處理 

  • Spark Streaming的輸入操作:對於輸出操作,Spark提供了將資料列印到螢幕及輸入到檔案中。在WordCount中我們將DStream wordCounts輸入到HDFS檔案中。
wordCounts = saveAsHadoopFiles(“WordCount”)
  • Spark Streaming啟動:經過上述的操作,Spark Streaming還沒有進行工作,我們還需要呼叫Start操作,Spark Streaming才開始監聽相應的埠,然後收取資料,並進行統計。
ssc.start()

Spark Streaming案例分析 

在網際網路應用中,網站流量統計作為一種常用的應用模式,需要在不同粒度上對不同資料進行統計,既有實時性的需求,又需要涉及到聚合、去重、連線等較為複雜的統計需求。傳統上,若是使用Hadoop MapReduce框架,雖然可以容易地實現較為複雜的統計需求,但實時性卻無法得到保證;反之若是採用Storm這樣的流式框架,實時性雖可以得到保證,但需求的實現複雜度也大大提高了。Spark Streaming在兩者之間找到了一個平衡點,能夠以準實時的方式容易地實現較為複雜的統計需求。 下面介紹一下使用Kafka和Spark Streaming搭建實時流量統計框架。 

  • 資料暫存:Kafka作為分散式訊息佇列,既有非常優秀的吞吐量,又有較高的可靠性和擴充套件性,在這裡採用Kafka作為日誌傳遞中介軟體來接收日誌,抓取客戶端傳送的流量日誌,同時接受Spark Streaming的請求,將流量日誌按序傳送給Spark Streaming叢集。
  • 資料處理:將Spark Streaming叢集與Kafka叢集對接,Spark Streaming從Kafka叢集中獲取流量日誌並進行處理。Spark Streaming會實時地從Kafka叢集中獲取資料並將其儲存在內部的可用記憶體空間中。當每一個batch視窗到來時,便對這些資料進行處理。 
  • 結果儲存:為了便於前端展示和頁面請求,處理得到的結果將寫入到資料庫中。 

相比於傳統的處理框架,Kafka+Spark Streaming的架構有以下幾個優點。 

  • Spark框架的高效和低延遲保證了Spark Streaming操作的準實時性。
  • 利用Spark框架提供的豐富API和高靈活性,可以精簡地寫出較為複雜的演算法。 
  • 程式設計模型的高度一致使得上手Spark Streaming相當容易,同時也可以保證業務邏輯在實時處理和批處理上的複用。 

在基於Kafka+Spark Streaming的流量統計應用執行過程中,有時會遇到記憶體不足、GC阻塞等各種問題。下面介紹一下如何對Spark Streaming應用程式進行調優來減少甚至避免這些問題的影響。 

效能調優 

優化執行時間

  •  增加並行度。確保使用整個叢集的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用叢集資源。
  • 減少資料序列化、反序列化的負擔。Spark Streaming預設將接收到的資料序列化後儲存以減少記憶體的使用。但序列化和反序列化需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的序列化介面可以更高效地使用CPU。 
  • 設定合理的batch視窗。在Spark Streaming中,Job之間有可能存在著依賴關係,後面的Job必須確保前面的Job執行結束後才能提交。若前面的Job執行時間超出了設定的batch視窗,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此,設定一個合理的batch視窗確保Job能夠在這個batch視窗中結束是必須的。 
  • 減少任務提交和分發所帶來的負擔。通常情況下Akka框架能夠高效地確保任務及時分發,但當batch視窗非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常會比使用Fine-Grained Mesos模式有更小的延遲。 

優化記憶體使用

  • 控制batch size。Spark Streaming會把batch視窗內接收到的所有資料存放在Spark內部的可用記憶體區域中,因此必須確保當前節點Spark的可用記憶體至少能夠容納這個batch視窗內所有的資料,否則必須增加新的資源以提高叢集的處理能力。
  • 及時清理不再使用的資料。上面說到Spark Streaming會將接收到的資料全部儲存於內部的可用記憶體區域中,因此對於處理過的不再需要的資料應及時清理以確保Spark Streaming有富餘的可用記憶體空間。通過設定合理的spark.cleaner.ttl時長來及時清理超時的無用資料。 
  • 觀察及適當調整GC策略。GC會影響Job的正常執行,延長Job的執行時間,引起一系列不可預料的問題。觀察GC的執行情況,採取不同的GC策略以進一步減小記憶體回收對Job執行的影響。 

總結 

Spark Streaming提供了一套高效、可容錯的準實時大規模流式處理框架,它能和批處理及即時查詢放在同一個軟體棧中,降低學習成本。如果你學會了Spark程式設計,那麼也就學會了Spark Streaming程式設計,如果理解了Spark的排程和儲存,那麼Spark Streaming也類似。對開源軟體感興趣的讀者,我們可以一起貢獻社群。目前Spark已在Apache孵化器中。按照目前的發展趨勢,Spark Streaming一定將會得到更大範圍的使用。