大資料求索(8):Spark Streaming簡易入門一
大資料求索(8):Spark Streaming簡易入門一
一、Spark Streaming簡單介紹
Spark Streaming是基於Spark Core上的一個應用程式,可伸縮,高吞吐,容錯(這裡主要是藉助Spark Core的容錯方式)處理線上資料流,資料可以有不同的來源,以及同時處理不同來源的資料。Spark Streaming處理的資料可以結合ML和Graph。
Spark Streaming本身是隨著流進來的資料按照時間為單位生成Job,然後觸發Job,在Cluster執行的一個流式處理引擎,本質上說是加上了時間維度的批處理。
二、Spark Streaming作用
Spark Streaming支援從多種資料來源中讀取資料,如Kafka,Flume,HDFS,Kinesis,Twitter等,並且可以使用高階函式如map,reduce,join,window等操作,處理後的資料可以儲存到檔案系統,資料庫,Dashboard等。
三、Spark Streaming原理
Spark Streaming工作原理如下圖所示:
- 以時間為單位將資料流切分成離散的資料單位
- 將每批資料看做RDD,使用RDD操作符處理
- 最終結果以RDD為單位返回
輸入的資料流經過Spark Streaming的receiver,資料切分為DStream
簡言之,Spark Streaming就是把資料按時間切分,然後按傳統離線處理的方式計算。從計算流程角度看就是多了對資料收集和按時間節分。
四、Spark Streaming核心概念
4.1 Streaming Context
類似於Spark Core,要想使用Spark Streaming,需要建立一個StreamingContext物件作為入口。類似程式碼如下:
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
觀察原始碼可以發現,其實StreamingContext實現還是建立了一個SparkContext。其中,第二個引數Seconds(1)表示1秒處理一次。目前還是基於秒級的計算,無法做到毫秒級。
Spark Streaming也可以通過傳入一個SparkContext物件來建立
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
當建立好上下文物件以後,需要做如下事情:
- 通過建立一個DStreams來定義輸入源
- 通過將transformation 和output 操作應用於DStream來定義流式計算
- 開始接收資料並使用streamingContext.start()啟動
- 等待使用streamingContext.awaitTermination()停止處理(手動或由於任何錯誤)
- 可以使用streamingContext.stop()手動停止處理
注意:
- 一旦上下文已經啟動,就不能設定或新增新的流式計算。
- 一旦上下文停止後,無法重新啟動。(不要和重新執行程式重新建立一個混淆)
- 在JVM中只能同時執行一個StreamingContext
- StreamingContext上的stop也會停止SparkContext。要僅停止StreamingContext,將stopSparkContext的stop的可選引數設定為false。
- 只要在建立下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext),就可以重複使用SparkContext來建立多個StreamingContexts。
4.2 Discretized Streams (DStreams)
DStream可以說是SPark Streaming 的核心。它代表連續的資料流,可以是從源接收的輸入資料流,也可以由轉換操作生成的已經處理過的資料流。內部實現上,DStreams其實是一個連續的RDD序列,其中的每個RDD都包含來自特定時間間隔的資料。如下圖所示:
任何應用在DStream上的操作都會轉換為底層RDD上的操作。對於詞頻統計來說,flatMap運算元應用於DStream每行中上的每一個RDD以生成單詞DStream的RDDs。如下圖所示:
4.2 Input DStreams 和 Receivers
Input DStreams代表從Streaming源接收到的資料構成的DStreams。每一個Input DStream(除了file stream)都與一個Receiver物件相關聯,這個物件從源接收資料並將其儲存在Spark的記憶體中進行處理。
Spark Streaming提供兩種策略用於內建的streaming源
-
basic sources:
sources直接在StreamingContext的API中得到,如檔案系統、socket連線等
-
advanced sources:
像Kafka、Flume等源,可通過額外的工具類獲得,這些需要引入額外的jar包。
注意,如果想在streaming程式中並行接收多個streams資料,可以建立多個input DStreams。這將建立多個Receivers,這些receiver將同時接收多個數據流。但是,Spark的worker/exector是一個長期執行的任務,因此它佔用了分配給Spark Streaming應用程式的其中一個核(執行緒)。所以採用local模式執行,至少分配2個核,不然無法進行下面的計算。
注意點:
- 在本地執行Spark Streaming程式時,請勿使用“local”或“local [1]”作為主URL。這兩種模式都意味著只有一個執行緒用於本地執行任務。如果此時再用receiver接收Input DStream,這單個執行緒便用來執行Receiver,不會用於接下來的處理資料了。因此,採用“local[n]”,這裡的n要大於Receiver的數量。
- 在叢集上執行時,分配給Spark Streaming應用程式的核心數必須大於接收器數。否則系統將接收資料,但無法處理資料。
4.3 Basic Sources
上面的舉例中,是從套接字中接收文字資訊建立一個DStream。除了sockets,StreamingContext API提供了從檔案作為輸入源建立DStream的方法。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
fileStream:用於從與HDFS API相容的任何檔案系統上的檔案中讀取資料(如HDFS、S3、NFS等),建立一個DStream。
Spark Streaming將會監視傳入的dataDirectory目錄,並處理在這個目錄下建立的所有檔案,但不支援巢狀目錄
注意:
- 檔案必須有相同的格式
- 必須通過原子move或rename的到此目錄下的檔案
- 一旦移動,這個檔案將不能改變。因此,即使這個檔案後面被追加了資料,新資料也不會被讀取。
下一篇將繼續介紹。