1. 程式人生 > >Spark學習筆記 --- SparkStreaming 中基本概念

Spark學習筆記 --- SparkStreaming 中基本概念

StreamingContext

StreamingContext 是Spark Streaming程式的入口點,正如SparkContext是Spark程式的入口點一樣。

StreamingContext中維護了一個SparkContext例項,你可以通過 ssc.sparkContext 來訪問它。該SparkContext例項要麼在建立StreamingContext時被傳入,要麼在StreamingContext內部根據傳入的SparkConf進行建立,這取決於你所使用的StreamingContext建構函式。

關於DStream

Spark Streaming將流資料抽象為離散化流(discretized stream),即 DStream

 。DStream在內部被表示為一個連續的RDD序列,每一個RDD包含了一個固定時間間隔內資料來源所產生的資料,如下圖所示。


對DStream所進行的操作將被轉換為對底層RDD的操作。例如,在前面的流資料單詞計數示例程式中, lines.flatMap(_.split(" ")) 語句中的 flatMap 運算元就被應用到lines DStream中的RDD以生成words DStream中的RDD,如下圖所示。


InputDStream和ReceiverInputDStream

InputStream是所有輸入流的抽象基類。 這個類提供start()和stop()方法,這兩個方法被

Spark 流系統用來啟動開始接收資料和停止接收資料。

如果輸入流僅在執行在driver結點的服務和執行緒產生的新資料生成RDD,那麼可能直接繼承這個InputStream。以FileINputDStream為例,它是InputStream的一個子類,在driver端監控一個HDFS目錄,如果有新檔案生成,則用新檔案生成RDDs。 

如果實現的輸入流需要在工作結點執行一個接收器,使用[[org.apache.spark.streaming.dstream.ReceiverInputDStream]]作為父類。

ReceiverInputDStream是一個抽像類,用於需要在工作結點啟動一個接收器來接收外部資料。 ReceiverInputDStream的具體實現必須定義getReceiver方法來得到一個[[org.apache.spark.streaming.receiver.Receiver]] 物件。Receiver物件會發送到工作結點來接收資料。

 我們也可以建立多個InputDStream來連線多個數據源,其中的ReceiverInputDStream都將啟動Receiver來接收資料。 一個Spark Streaming應用程式應該分配足夠多的核心(local模式下是執行緒)去執行receiver(s)並處理其接收的資料。當我們以本地模式執行Spark Streaming程式時,master URL不能指定為 local 或者 local[1] (Spark Streaming會啟動一個執行緒執行receiver,只有一個執行緒將導致沒有執行緒來處理資料),而應該是 local[n] ,這個n應該大於receiver的個數。在叢集中執行Spark Streaming程式時,同樣道理,也需要分配大於receiver的個數的核心數。

基本資料來源

Spark Streaming提供了從很多資料來源獲取流資料的方法,一些基本的資料來源可以通過StreamingContext API直接使用,主要包括:檔案系統、網路連線、Akka actors等。

檔案資料流

StreamingContext提供了從兼容於HDFS API的所有檔案系統中建立檔案資料輸入流的方法,如下:

ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

檔案流沒有receiver。Spark Streaming將監控對應目錄(但不支援巢狀目錄),並處理在該目錄中建立的任何檔案(以 . 開頭的將被忽略)。監控目錄中的檔案必須有相同的資料格式。監控目錄中的檔案如果被修改(比如以append方式寫入),這些修改將不會被讀取,因此正確的方式應該是先在其他目錄中寫好這些檔案並將其移動或者重新命名到該監控目錄。

對於簡單的文字檔案,可以使用更簡單的方法,如下:

ssc.textFileStream(dataDirectory)

網路資料流

網路連線流可以使用 ssc.socketStream() 或 ssc.socketTextStream() 建立。

Akka Actor流

可以通過 ssc.actorStream() 建立一個從Akka actor接收資料流的ReceiverInputDStream。

RDD序列流

我們也可以用 ssc.queueStream() 建立一個基於RDD序列的InputDStream。序列中的每一個RDD將被作為DStream中的一個數據批,這通常在測試你的Spark Streaming程式時非常有用。

高階資料來源

對於Kafka、Flume、Kinesis、Twitter等這些高階資料來源,則需要新增外部依賴。

自定義資料來源

你也可以自定義資料來源,只需要實現一個自己的receiver從自定義資料來源接收資料並將其推送到Spark。

Receiver可靠性

依據可靠性可將Receiver分為兩類。 可靠Receiver 帶有傳輸確認機制(ACK機制),可以確保資料在傳輸過程中不會丟失,Kafka和Flume等在ACK機制開啟的情況下就是可靠的。 不可靠Receiver 不帶有傳輸確認機制,包括不支援ACK機制和支援ACK但關閉的情形。

window

Spark Streaming也提供了基於視窗的計算,它允許你在一個滑動視窗上使用轉換操作,滑動視窗如下圖所示。


視窗是基於時間滑動的,視窗操作新形成的DStream中的每一個RDD包含了某一滑動視窗中的所有資料。任何視窗操作都需要指定如下兩個引數:

  • 視窗長度:它必須是源DStream批處理間隔的整數倍。
  • 滑動間隔:它必須是源DStream批處理間隔的整數倍。

一些常用的視窗操作運算元如下:

Transformation Meaning
window ( windowLength , slideInterval ) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow ( windowLength , slideInterval ) Return a sliding window count of elements in the stream.
reduceByWindow ( func , windowLength , slideInterval ) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func . The function should be associative so that it can be computed correctly in parallel.
reduceByKeyAndWindow ( func , windowLength , slideInterval , [ numTasks ]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow ( func , invFunc , windowLength , slideInterval , [ numTasks ]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc ). Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
countByValueAndWindow ( windowLength , slideInterval , [ numTasks ]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument.

需要強調的是,上述某些操作(如 reduceByWindow 和 reduceByKeyAndWindow 等)有一些特殊形式,通過只考慮新進入視窗的資料和離開視窗的資料,讓Spark增量計算歸約結果。這種特殊形式需要額外提供一個規約函式的逆函式,比如 + 對應的逆函式為 - 。對於較大的視窗,提供逆函式可以大大提高執行效率。