1. 程式人生 > >Spark Streaming工作原理分析與使用

Spark Streaming工作原理分析與使用

Spark Streaming入門

1. 概述

Spark Streaming Spark Core API 的擴充套件, 它支援彈性的, 高吞吐的, 容錯的實時資料流的處理。

spark streaming提供是一種分散式計算能力。

資料來源

資料可以通過多種資料來源獲取, 例如 Kafka, Flume以及 TCP sockets, 也可以通過例如 map, reduce, join, window 等的高階函式組成的複雜演算法處理。

最終, 處理後的資料可以輸出到檔案系統, 資料庫以及實時儀表盤中. 事實上, 你還可以在 data streams(資料流)上使用 機器學習 以及 圖計算 演算法.

工作原理

Spark Streaming是用來實時處理資料的,但是會把一定的時間間隔的資料當做一個批次去處理,在單位時間間隔內就相當於處理離線的資料了。底層操作其實還是基於RDD的。你可以自定義單位時間間隔的大小。

 

在內部, 它工作原理如下, Spark Streaming 接收實時輸入資料流並將資料切分成多個 batch(批)資料, 然後由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果).

 

Spark Streaming 提供了一個名為 discretized stream DStream 的高階抽象, 它代表一個連續的資料流. DStream

可以從資料來源的輸入資料流建立, 例如 Kafka, Flume 以及 Kinesis, 或者在其他 DStream 上進行高層次的操作以建立. 在內部, 一個 DStream 是通過一系列的 RDDs 來表示.

DStream

它代表了一個連續的資料流,在內部, 一個 DStream 被表示為一系列連續的 RDDs, 在一個 DStream 中的每個 RDD 包含來自一定的時間間隔的資料。

 

應用於 DStream 的任何操作轉化為對於底層的 RDDs 的操作.

 

Spark Streaming資料來源

l Basic sources(基礎的資料來源): StreamingContext API

中直接可以使用的資料來源.

可以通過StreamingContext點出來的api都是基礎資料來源;

l Advanced sources(高階的資料來源): Kafka, Flume, Kinesis, 等等這樣的資料來源.

通過第三方的框架封裝的獲取資料api,稱之為高階資料來源。

Setting the Right Batch Interval (設定正確的批次間隔)

對於在叢集上穩定地執行的 Spark Streaming application, 該系統應該能夠處理資料儘可能快地被接收.換句話說, 應該處理批次的資料就像生成它們一樣快.這是否適用於 application 可以在 monitoring streaming web UI 中的 processing times 中被找到, processing time (批處理處理時間)應小於 batch interval (批間隔).

取決於 streaming computation (流式計算)的性質, 使用的 batch interval (批次間隔)可能對處理由應用程式持續一組固定的 cluster resources (叢集資源)的資料速率有重大的影響.例如, 讓我們考慮早期的 WordCountNetwork 示例.對於特定的 data rate (資料速率), 系統可能能夠跟蹤每 2 秒報告 word counts (即 2 秒的 batch interval (批次間隔)), 但不能每 500 毫秒.因此, 需要設定 batch interval (批次間隔), 使預期的資料速率在生產可以持續.

為您的應用程式找出正確的 batch size (批量大小)的一個好方法是使用進行測試 conservative batch interval (保守的批次間隔)(例如 5-10 秒)和 low data rate (低資料速率).驗證是否系統能夠跟上 data rate (資料速率), 可以檢查遇到的 end-to-end delay (端到端延遲)的值通過每個 processed batch (處理的批次)(在 Spark driver log4j 日誌中查詢 “Total delay” , 或使用 StreamingListener 介面). 如果 delay (延遲)保持與 batch size (批量大小)相當, 那麼系統是穩定的.除此以外, 如果延遲不斷增加, 則意味著系統無法跟上, 因此不穩定.一旦你有一個 stable configuration (穩定的配置)的想法, 你可以嘗試增加 data rate and/or 減少 batch size .請注意, momentary increase (瞬時增加)由於延遲暫時增加只要延遲降低到 low value (低值), 臨時資料速率增加就可以很好(即, 小於 batch size (批量大小)).

2. spark streaming入門案例

2.1.1. Spark streaming  程式設計模型

注意:最後的 ssc.start()ssc.awaitTermination()容易不寫

setMaster("local[*]")至少分分配兩個核,一個接受資料,一個處理資料。

一個StreamingContext建立多個input Dstream,會建立多個ReceiverSpark會為每個Receiver 分配一個core用於其執行。

故若SparkStreaming 程式一共分配了kcore,執行nReceiver,應保證k>n,這時會有ncore用於執行Receiver接收外部資料,k-ncore用於真正的計算。

eg:

val lines = ssc.socketTextStream("192.168.80.10", 9999) // Receiver, job-0input job 1 core 長駐程序,

val lines = ssc.socketTextStream("192.168.80.10", 9998) // Receiver, job-1 1 core

一個Receiver 佔用了一個core,這裡兩個Receiver佔用了2個core,如果這個job的啟動資源是 --master "local[4]" 那麼真正能用於運算的core只有兩個了。

val conf = new SparkConf().setMaster("local[*]").setAppName("ck")val ssc = new StreamingContext(conf,Seconds(2))

邏輯程式碼

// 啟動spark streaming app

ssc.start()

// 等待被終止, main 阻塞ssc.awaitTermination()

streaming wordcount

2.1.2. 匯入依賴

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming_2.11</artifactId>

    <version>2.2.0</version>

</dependency>

2.1.3. socket獲取資料(nc -lk )並統計wordcount

安裝nc命令,yum install -y nc

nc(netcat),一般我們多用在區域網內傳送檔案(scp多用在跳板機存在的情況),

nc -lk 9999

監聽9999這個埠就可向這個埠中傳輸資料了

Spark streaming就可以實時的從這個埠中獲得資料

啟動nc命令,並監聽一個埠

啟動程式:

/**  * author: sheep.Old   * qq: 64341393  * Created 2018/6/14  */object WordCount {def main(args: Array[String]): Unit = {// 建立一個具有兩個工作執行緒(working thread)並且批次間隔為 2 秒的本地 StreamingContext .        // master 需要 2 個核, 以防止飢餓情況(starvation scenario.val sparkConf = new SparkConf()        sparkConf.setMaster("local[*]")        sparkConf.setAppName("wordcount")// 2 seconds 對資料進行一次切分// 啟動有別的執行緒用來接受資料和計算資料val ssc = new StreamingContext(sparkConf, Seconds(2))// 獲取資料 (基礎資料來源) ReceiverInputDStream DStream子類val stream = ssc.socketTextStream("10.172.50.11", 52020)// 計算 wordcount a b c dval words = stream.flatMap(_.split(" "))// 將單詞和1進行組合 (word, 1val wordAndOne = words.map(word => (word, 1))// 分組聚合,統計單詞出現的次數val wordsCount = wordAndOne.reduceByKey(_ + _)// 列印結果wordsCount.print()// 釋放資源ssc.start() // 啟動spark streaming appssc.awaitTermination() // 等待被終止, main 阻塞}}

總結:

只能統計當前批次的結果資料;但是我們可以基於MySQL或者Redis來實現帶歷史狀態的統計,當然streaming也給我們提供了一個updateStateByKey來實現歷史狀態的統計,但是使用該方法的時候需要做checkpoint

問題:

1) 修改本地yum源為網路的yum源?

cd /etc/yum.repos.d/

mv CentOS-Media.repo CentOS-Media.repo.bak

mv CentOS-Base.repo.bak CentOS-Base.repo

2) import org.apache.spark.streaming.Seconds

3) pom.xml檔案不要倒錯包,尤其注意scala sdk的版本

DStream transformations

DStream上的轉化運算元:注意這些運算元RDD上沒有

RDD 類似,transformation 允許從 input DStream 輸入的資料做修改. DStreams 支援很多在 RDD 中可用的 transformation 運算元。一些常用的如下所示 :

RDD類似,類似,transformation 允許修改來自 input DStream 的資料. DStreams 支援標準的 Spark RDD 上可用的許多轉換. 一些常見的如下.

Transformation(轉換)

Meaning(含義)

map(func)

利用函式 func 處理原 DStream 的每個元素,返回一個新的 DStream.

flatMap(func)

與 map 相似,但是每個輸入項可用被對映為 0 個或者多個輸出項。.

filter(func)

返回一個新的 DStream,它僅僅包含原 DStream 中函式 func 返回值為 true 的項.

repartition(numPartitions)

通過建立更多或者更少的 partition 以改變這個 DStream 的並行級別(level of parallelism).

union(otherStream)

返回一個新的 DStream,它包含源 DStream 和 otherDStream 的所有元素.

count()

通過 count 源 DStream 中每個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream.

reduce(func)

利用函式 func 聚集源 DStream 中每個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函式應該是相關聯的,以使計算可以並行化.

countByValue()

在元素型別為 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每個 key 的值是在原 DStream 的每個 RDD 中的次數.

reduceByKey(func, [numTasks])

當在一個由 (K,V) pairs 組成的 DStream 上呼叫這個運算元時,返回一個新的, 由 (K,V) pairs 組成的 DStream,每一個 key 的值均由給定的 reduce 函式聚合起來. 注意:在預設情況下,這個運算元利用了 Spark 預設的併發任務數去分組。你可以用 numTasks 引數設定不同的任務數。

join(otherStream, [numTasks])

當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream.

cogroup(otherStream, [numTasks])

當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組).

transform(func)

通過對源 DStream 的每個 RDD 應用 RDD-to-RDD 函式,建立一個新的 DStream. 這個可以在 DStream 中的任何 RDD 操作中使用.

updateStateByKey(func)

返回一個新的 "狀態" 的 DStream,其中每個 key 的狀態通過在 key 的先前狀態應用給定的函式和 key 的新 valyes 來更新. 這可以用於維護每個 key 的任意狀態資料.

其中一些轉換值得深入討論.

UpdateStateByKey 操作

 updateStateByKey 操作允許您維護任意狀態,同時不斷更新新資訊. 你需要通過兩步來使用它.

00001. 定義 state - state 可以是任何的資料型別.

00002. 定義 state update function(狀態更新函式) - 使用函式指定如何使用先前狀態來更新狀態,並從輸入流中指定新值.

在每個 batch 中,Spark 會使用狀態更新函式為所有已有的 key 更新狀態,不管在 batch 中是否含有新的資料。如果這個更新函式返回一個 none,這個 key-value pair 也會被消除.

DStream Actions

Output Operation

Meaning

print()

在執行流應用程式的 driver 節點上的DStream中列印每批資料的前十個元素. 這對於開發和除錯很有用. I 這在 Python API 中稱為 pprint().

saveAsTextFiles(prefix, [suffix])

將此 DStream 的內容另存為文字檔案. 每個批處理間隔的檔名是根據 字首 和 字尾 : "prefix-TIME_IN_MS[.suffix]" 生成的.

saveAsObjectFiles(prefix, [suffix])

將此 DStream 的內容另存為序列化 Java 物件的 SequenceFiles. 每個批處理間隔的檔名是根據 字首 和 字尾 : "prefix-TIME_IN_MS[.suffix]" 生成的. PI 這在Python API中是不可用的.

saveAsHadoopFiles(prefix, [suffix])

將此 DStream 的內容另存為 Hadoop 檔案. 每個批處理間隔的檔名是根據 字首 和 字尾 : "prefix-TIME_IN_MS[.suffix]" 生成的. 這在Python API中是不可用的.

foreachRDD(func)

對從流中生成的每個 RDD 應用函式 func 的最通用的輸出運算子. 此功能應將每個 RDD 中的資料推送到外部系統, 例如將 RDD 儲存到檔案, 或將其通過網路寫入資料庫. 請注意, 函式 func 在執行流應用程式的 driver 程序中執行, 通常會在其中具有 RDD 動作, 這將強制流式傳輸 RDD 的計算.

2.1.4. foreachRDD 設計模式的使用

dstream.foreachRDD 是一個強大的原語, 允許將資料傳送到外部系統.但是, 瞭解如何正確有效地使用這個原語很重要. 避免一些常見的錯誤如下.

通常向外部系統寫入資料需要建立連線物件(例如與遠端伺服器的 TCP 連線), 並使用它將資料傳送到遠端系統.為此, 開發人員可能會無意中嘗試在Spark driver 中建立連線物件, 然後嘗試在Spark工作人員中使用它來在RDD中儲存記錄.例如(在 Scala 中):

2.1.5. foreachRDD的使用

可以使用foreachRDDDStream轉換成RDD進行操作:

// 方式一:以下是使用對DStream進行操作val words = stream.flatMap(_.split(" "))  // 計算 wordcount a b c dval wordAndOne = words.map(word => (word, 1))  // 將單詞和1進行組合 (word, 1val wordsCount = wordAndOne.reduceByKey(_ + _) // 分組聚合,統計單詞出現的次數wordsCount.print() // 列印結果,觸發action// 方式二:將DStream轉換成RDD進行操作,和spark core操作方式一樣stream.foreachRDD(rdd => {// 可以選擇對DStream操作,也可以選擇對RDD進行操作,擅長什麼就使用什麼val wordsCount = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)    wordsCount.foreachPartition(partition => {// 獲取一根連線 DriverManager.getConnection(....)        /*向資料庫中寫入資料,寫入之前要判斷一下插入的單詞是否已經存在,如果已存在累加,不存在插入*/partition.foreach()// 將連線close})})

foreachRDD在使用連線的時候可以進行如下優化:

通常, 建立連線物件具有時間和資源開銷. 因此, 建立和銷燬每個記錄的連線物件可能會引起不必要的高開銷, 並可顯著降低系統的總體吞吐量. 一個更好的解決方案是使用rdd.foreachPartition - 建立一個連線物件, 並使用該連線在 RDD 分割槽中傳送所有記錄.

dstream.foreachRDD{rdd=>

rdd.foreachPartition{partitionOfRecords=>

// ConnectionPool is a static, lazily initialized pool of connections

valconnection=ConnectionPool.getConnection()

partitionOfRecords.foreach(record=>connection.send(record))

ConnectionPool.returnConnection(connection)// return to the pool for future reuse

}

2.1.6. checkpoint

Ø Metadata checkpointing - 將定義 streaming 計算的資訊儲存到容錯儲存(如 HDFS)中.這用於從執行 streaming 應用程式的 driver 的節點的故障中恢復(稍後詳細討論). 元資料包括:

Configuration - 用於建立流應用程式的配置.

DStream operations - 定義 streaming 應用程式的 DStream 操作集.

Incomplete batches - 批量的job 排隊但尚未完成.

Ø Data checkpointing - 將生成的 RDD 儲存到可靠的儲存.這在一些將多個批次之間的資料進行組合的 狀態 變換中是必需的.在這種轉換中, 生成的 RDD 依賴於先前批次的 RDD, 這導致依賴鏈的長度隨時間而增加.為了避免恢復時間的這種無限增加(與依賴關係鏈成比例), 有狀態轉換的中間 RDD 會定期 checkpoint 到可靠的儲存(例如 HDFS)以切斷依賴關係鏈.

何時啟用 checkpoint

對於具有以下任一要求的應用程式, 必須啟用 checkpoint:

· 使用狀態轉換 - 如果在應用程式中使用 updateStateByKey reduceByKeyAndWindow(具有反向功能), 則必須提供 checkpoint 目錄以允許定期的 RDD checkpoint.

· 從執行應用程式的 driver 的故障中恢復 - 元資料 checkpoint 用於使用進度資訊進行恢復.

請注意, 無需進行上述有狀態轉換的簡單 streaming 應用程式即可執行, 無需啟用 checkpoint. 在這種情況下, 驅動器故障的恢復也將是部分的(一些接收但未處理的資料可能會丟失). 這通常是可以接受的, 許多執行 Spark Streaming 應用程式. 未來對非 Hadoop 環境的支援預計會有所改善.

如何配置 checkpoint

可以通過在儲存 checkpoint 資訊的容錯, 可靠的檔案系統(例如, HDFS, S3等)中設定目錄來啟用 checkpoint. 這是通過使用 streamingContext.checkpoint(checkpointDirectory) 完成的. 這將允許您使用上述有狀態轉換. 另外, 如果要使應用程式從 driver 故障中恢復, 您應該重寫 streaming 應用程式以具有以下行為.

· 當程式第一次啟動時, 它將建立一個新的 StreamingContext, 設定所有流, 然後呼叫 start().

· 當程式在失敗後重新啟動時, 它將從 checkpoint 目錄中的 checkpoint 資料重新建立一個 StreamingContext.

updatestateByKey

2.1.7. updateStateByKey實現wordcount歷史狀態統計

object WordCountPlus {// 遮蔽日誌Logger.getLogger("org").setLevel(Level.WARN)/**      * newValues: Seq[Int]  : 當前批次某個單詞出現的次數  a Seq(1,1,1,1)      * runningCount: Option[Int]: 上次a的值 a=None      */val updateFunction = (newValues: Seq[Int], runningCount: Option[Int]) => {val newCount = newValues.sum + runningCount.getOrElse(0)Some(newCount)    }// 建立一個新的StreamingContext例項val function2CreateContext = () => {println("------------------------------")val conf = new SparkConf()        conf.setMaster("local[*]")        conf.setAppName(this.getClass.getSimpleName)val ssc = new StreamingContext(conf, Seconds(2))        ssc.checkpoint("./ckpt")val stream = ssc.socketTextStream("10.172.50.11", 44444)val wordCountResult = stream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)        wordCountResult.print()        ssc    }def main(args: Array[String]): Unit = {/**          * 嘗試從checkpoint目錄中恢復以往的streamingcontext例項* 如果恢復不了,則建立一個新的*/val ssc = StreamingContext.getOrCreate("./ckpt", function2CreateContext)        ssc.start()        ssc.awaitTermination()    }

Window Operations(視窗操作)

Spark Streaming 也支援 windowed computations(視窗計算),它允許你在資料的一個滑動視窗上應用 transformation(轉換). 下圖說明了這個滑動視窗.

如上圖顯示,視窗在源 DStream 上 slides(滑動),合併和操作落入窗內的源 RDDs,產生視窗化的 DStream 的 RDDs。在這個具體的例子中,程式在三個時間單元的資料上進行視窗操作,並且每兩個時間單元滑動一次。 這說明,任何一個視窗操作都需要指定兩個引數.

· window length(視窗長度) - 視窗的持續時間(圖 3).

· sliding interval(滑動間隔) - 執行視窗操作的間隔(圖 2).

這兩個引數必須是 source DStream 的 batch interval(批間隔)的倍數(圖 1).

讓我們舉例以說明視窗操作. 例如,你想擴充套件前面的例子用來計算過去 30 秒的詞頻,間隔時間是 10 秒. 為了達到這個目的,我們必須在過去 30 秒的 (wrod, 1) pairs 的 pairs DStream 上應用 reduceByKey 操作. 用方法 reduceByKeyAndWindow 實現.

Transformation(轉換) Meaning(含義)

window(windowLength, slideInterval)

返回一個新的 DStream, 它是基於 source DStream 的視窗 batch 進行計算的.

countByWindow(windowLength, slideInterval)

返回 stream(流)中滑動視窗元素的數

reduceByWindow(func, windowLength, slideInterval)

返回一個新的單元素 stream(流),

它通過在一個滑動間隔的 stream 中使用 func 來聚合以建立. 該函式應該是 associative(關聯的)且 commutative(可交換的),以便它可以平行計算

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

在一個 (K, V) pairs DStream 上呼叫時, <