1. 程式人生 > >大資料求索(8):Spark Streaming簡易入門一

大資料求索(8):Spark Streaming簡易入門一

大資料求索(8):Spark Streaming簡易入門一

一、Spark Streaming簡單介紹

Spark Streaming是基於Spark Core上的一個應用程式,可伸縮,高吞吐,容錯(這裡主要是藉助Spark Core的容錯方式)處理線上資料流,資料可以有不同的來源,以及同時處理不同來源的資料。Spark Streaming處理的資料可以結合ML和Graph。

Spark Streaming本身是隨著流進來的資料按照時間為單位生成Job,然後觸發Job,在Cluster執行的一個流式處理引擎,本質上說是加上了時間維度的批處理。

二、Spark Streaming作用

Spark Streaming支援從多種資料來源中讀取資料,如Kafka,Flume,HDFS,Kinesis,Twitter等,並且可以使用高階函式如map,reduce,join,window等操作,處理後的資料可以儲存到檔案系統,資料庫,Dashboard等。

Spark Streming

三、Spark Streaming原理

Spark Streaming工作原理如下圖所示:

工作原理

  • 以時間為單位將資料流切分成離散的資料單位
  • 將每批資料看做RDD,使用RDD操作符處理
  • 最終結果以RDD為單位返回

輸入的資料流經過Spark Streaming的receiver,資料切分為DStream

(類似RDD,DStream是Spark Streaming中流資料的邏輯抽象),然後DStream被Spark Core的離線計算引擎執行並行處理。

簡言之,Spark Streaming就是把資料按時間切分,然後按傳統離線處理的方式計算。從計算流程角度看就是多了對資料收集和按時間節分。

四、Spark Streaming核心概念

4.1 Streaming Context

類似於Spark Core,要想使用Spark Streaming,需要建立一個StreamingContext物件作為入口。類似程式碼如下:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

觀察原始碼可以發現,其實StreamingContext實現還是建立了一個SparkContext。其中,第二個引數Seconds(1)表示1秒處理一次。目前還是基於秒級的計算,無法做到毫秒級。

Spark Streaming也可以通過傳入一個SparkContext物件來建立

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

當建立好上下文物件以後,需要做如下事情:

  1. 通過建立一個DStreams來定義輸入源
  2. 通過將transformation 和output 操作應用於DStream來定義流式計算
  3. 開始接收資料並使用streamingContext.start()啟動
  4. 等待使用streamingContext.awaitTermination()停止處理(手動或由於任何錯誤)
  5. 可以使用streamingContext.stop()手動停止處理

注意:

  • 一旦上下文已經啟動,就不能設定或新增新的流式計算。
  • 一旦上下文停止後,無法重新啟動。(不要和重新執行程式重新建立一個混淆)
  • 在JVM中只能同時執行一個StreamingContext
  • StreamingContext上的stop也會停止SparkContext。要僅停止StreamingContext,將stopSparkContext的stop的可選引數設定為false。
  • 只要在建立下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext),就可以重複使用SparkContext來建立多個StreamingContexts。

4.2 Discretized Streams (DStreams)

DStream可以說是SPark Streaming 的核心。它代表連續的資料流,可以是從源接收的輸入資料流,也可以由轉換操作生成的已經處理過的資料流。內部實現上,DStreams其實是一個連續的RDD序列,其中的每個RDD都包含來自特定時間間隔的資料。如下圖所示:

Spark Streaming

任何應用在DStream上的操作都會轉換為底層RDD上的操作。對於詞頻統計來說,flatMap運算元應用於DStream每行中上的每一個RDD以生成單詞DStream的RDDs。如下圖所示:

Spark Streaming

4.2 Input DStreams 和 Receivers

Input DStreams代表從Streaming源接收到的資料構成的DStreams。每一個Input DStream(除了file stream)都與一個Receiver物件相關聯,這個物件從源接收資料並將其儲存在Spark的記憶體中進行處理。

Spark Streaming提供兩種策略用於內建的streaming源

  • basic sources:

    sources直接在StreamingContext的API中得到,如檔案系統、socket連線等

  • advanced sources:

    像Kafka、Flume等源,可通過額外的工具類獲得,這些需要引入額外的jar包。

注意,如果想在streaming程式中並行接收多個streams資料,可以建立多個input DStreams。這將建立多個Receivers,這些receiver將同時接收多個數據流。但是,Spark的worker/exector是一個長期執行的任務,因此它佔用了分配給Spark Streaming應用程式的其中一個核(執行緒)。所以採用local模式執行,至少分配2個核,不然無法進行下面的計算。

注意點:

  • 在本地執行Spark Streaming程式時,請勿使用“local”或“local [1]”作為主URL。這兩種模式都意味著只有一個執行緒用於本地執行任務。如果此時再用receiver接收Input DStream,這單個執行緒便用來執行Receiver,不會用於接下來的處理資料了。因此,採用“local[n]”,這裡的n要大於Receiver的數量。
  • 在叢集上執行時,分配給Spark Streaming應用程式的核心數必須大於接收器數。否則系統將接收資料,但無法處理資料。

4.3 Basic Sources

上面的舉例中,是從套接字中接收文字資訊建立一個DStream。除了sockets,StreamingContext API提供了從檔案作為輸入源建立DStream的方法。

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

fileStream:用於從與HDFS API相容的任何檔案系統上的檔案中讀取資料(如HDFS、S3、NFS等),建立一個DStream。

Spark Streaming將會監視傳入的dataDirectory目錄,並處理在這個目錄下建立的所有檔案,但不支援巢狀目錄

注意:

  • 檔案必須有相同的格式
  • 必須通過原子move或rename的到此目錄下的檔案
  • 一旦移動,這個檔案將不能改變。因此,即使這個檔案後面被追加了資料,新資料也不會被讀取。

下一篇將繼續介紹。