1. 程式人生 > >Spark學習(八)---Spark streaming原理

Spark學習(八)---Spark streaming原理

這次我們介紹spark streaming,今天主要是原理和相關的操作

  • Spark Streaming概念介紹
  • Spark Streaming的相關操作

1. Spark Streaming概念

1.1什麼是Spark Streaming

Spark Streaming類似於Apache Storm,用於流式資料的處理。根據其官方文件介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支援的資料來源有很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。資料輸入後可以用Spark的高度抽象操作如:map、reduce、join、window等進行運算。而結果也能儲存在很多地方,如HDFS,資料庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。 在這裡插入圖片描述

1.2 Spark Streaming的特點

  1. 易用,可以像編寫離線批處理一樣去編寫流式程式,支援java/scala/python語言。
  2. 容錯,SparkStreaming在沒有額外程式碼和配置的情況下可以恢復丟失的工作。
  3. 易整合到Spark體系,流式處理與批處理和互動式查詢相結合。

1.3 SparkStreaming與Storm的對比

在這裡插入圖片描述 在這裡插入圖片描述 主要的區別如下:

  • Strom是一個純實時的流式處理框架,即來一條資料就處理一條資料,這樣勢必叢集內有頻繁的網路通訊,吞吐量低。

  • SparkStreaming是微批處理框架,吞吐量高。

  • Strom的事務處理機制要比SparkStreaming的好,SparkStreaming中存在丟資料或者重複計算的問題,Storm中接受或拉取的每條資料可以準確的只處理一次。

  • Strom適合做簡單的彙總型計算,SparkStreaming可以做複雜的計算,因為SparkStreaming是基於Dstream來開發的,Dstream可以抽出RDD(即Spark的核心),支援更多的複雜計算。

  • Strom支援動態資源的調整,而SparkStreaming是粗粒度的資源排程(新版本中即使有也是通過kill excutor的形式)。

2. Spark Streaming原理

Spark Streaming 是基於spark的流式批處理引擎,其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用於處理實時資料流。 在這裡插入圖片描述

2.1 Spark Streaming計算流程

Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行快取或者儲存到外部裝置。下圖顯示了Spark Streaming的整個流程。 下面是SparkStreaming架構圖 在這裡插入圖片描述

2.2 Saprk Streaming的實時性

對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

2.3 Spark Streaming的容錯性

對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。 對於Spark Streaming來說,其RDD的傳承關係如下圖所示: 在這裡插入圖片描述 圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

3. DStream

3.1 什麼是DStream

Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark運算元操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料,如下圖:在這裡插入圖片描述 對資料的操作也是按照RDD為單位來進行的: 在這裡插入圖片描述 Spark Streaming使用資料來源產生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。 它的工作流程像下面的圖所示一樣,接受到實時資料後,給資料分批次,然後傳給Spark Engine處理最後生成該批次的結果。 在這裡插入圖片描述

4. DStream的相關操作

DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的操作,如:updateStateByKey()、transform()以及各種Window相關的操作。

4.1 Transformations on DStreams

Transformations Meaning
map(func) 對DStream中的各個元素進行func函式操作,然後返回一個新的DStream
flatMap(func) 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項
filter(func) 過濾出所有函式func返回值為true的DStream元素並返回一個新的DStream
repartition(numPartitions) 增加或減少DStream中的分割槽數,從而改變DStream的並行度
union(otherStream) 將源DStream和輸入引數為otherDStream的元素合併,並返回一個新的DStream.
count() 通過對DStream中的各個RDD中的元素進行計數,然後返回只有一個元素的RDD構成的DStream
reduce(func) 對源DStream中的各個RDD中的元素利用func進行聚合操作,然後返回只有一個元素的RDD構成的新的DStream.
countByValue() 對於元素型別為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數
reduceByKey(func, [numTasks]) 利用func函式對源DStream中的key進行聚合操作,然後返回新的(K,V)對構成的DStream
join(otherStream, [numTasks]) 輸入為(K,V)、(K,W)型別的DStream,返回一個新的(K,(V,W))型別的DStream
cogroup(otherStream, [numTasks]) 輸入為(K,V)、(K,W)型別的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組型別的DStream
transform(func) 通過RDD-to-RDD函式作用於DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
updateStateByKey(func) 根據key的之前狀態值和key的新值,對key進行更新,返回一個新狀態的DStream
  • 特殊的Transformations

(1)UpdateStateByKey Operation UpdateStateByKey用於記錄歷史記錄,儲存上次的狀態

(2)Window Operations(開窗函式) 滑動視窗轉換操作: 滑動視窗轉換操作的計算過程如下圖所示,我們可以事先設定一個滑動視窗的長度(也就是視窗的持續時間),並且設定滑動視窗的時間間隔(每隔多長時間執行一次計算),然後,就可以讓視窗按照指定時間間隔在源DStream上滑動,每次視窗停放的位置上,都會有一部分DStream被框入視窗內,形成一個小段的DStream,這時,就可以啟動對這個小段DStream的計算。 在這裡插入圖片描述 (1)紅色的矩形就是一個視窗,視窗框住的是一段時間內的資料流。 (2)這裡面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,視窗會slide一次。 所以基於視窗的操作,需要指定2個引數:

  • window length - The duration of the window (3 in the figure)
  • slide interval - The interval at which the window-based operation is performed (2 in the figure). a.視窗大小,一段時間內資料的容器。 b.滑動間隔,每隔多久計算一次。

4.2 Output Operations on DStreams

Output Operations可以將DStream的資料輸出到外部的資料庫或檔案系統,當某個Output Operations被呼叫時(與RDD的Action相同),spark streaming程式才會開始真正的計算過程。

Output Operations Meaning
print() 列印到控制檯
saveAsTextFiles(prefix, [suffix]) 儲存流的內容為文字檔案,檔名為"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) 儲存流的內容為SequenceFile,檔名為"prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix]) 儲存流的內容為hadoop檔案,檔名為 “prefix-TIME_IN_MS[.suffix]”.
foreachRDD(func) 對Dstream裡面的每個RDD執行func

本次介紹結束,下次課程將以例項演示spark Streaming的資料實時處理。