1. 程式人生 > >Spark Streaming 教程文件--概述、基本概念、效能調優

Spark Streaming 教程文件--概述、基本概念、效能調優

SparkStreaming教程

本文章主要講述SparkStreaming概念原理、基本概念、以及調優等一些知識點。

1      概述

1.1  SparkStreaming是什麼

Spark Streaming 是個批處理的流式(實時)計算框架。其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用於處理實時資料流。

   支援從多種資料來源獲取資料,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis以及TCP sockets,從資料來源獲取資料之後,可以使用諸如map、reduce、join等高階函式進行復雜演算法的處理。最後還可以將處理結果儲存到檔案系統,資料庫等。

Spark Streaming處理的資料流圖:





以上的連續4個圖,分別對應以下4個段落的描述:

  • Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入資料,進行處理後,處理結果儲存在HDFS、Databases等各種地方。
  • Spark Streaming接收這些實時輸入資料流,會將它們按批次劃分,然後交給Spark引擎處理,生成按照批次劃分的結果流。
  • Spark Streaming提供了表示連續資料流的、高度抽象的被稱為離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操作都會轉變為對底層RDD的操作。
  • Spark Streaming使用資料來源產生的資料流建立DStream,也可以在已有的DStream上使用一些操作來建立新的DStream。

1.2    2. Spark Streaming能做什麼

目前而言SparkStreaming 主要支援以下三種業務場景

  • 無狀態操作:只關注當前批次中的實時資料,例如:
  1. 商機標題分類,分類http請求端 -> kafka -> Spark Streaming -> http請求端Map -> 響應結果
  2. 網庫Nginx訪問日誌收集,flume->kafka -> Spark Streaming -> hive/hdfs
  3. 資料同步,網庫主站資料通過“主站”->kafka->Spark Streaming -> hive/hdfs
  • 有狀態操作:對有狀態的DStream進行操作時,需要依賴之前的資料 除了當前新生成的小批次資料,但還需要用到以前所生成的所有的歷史資料。新生成的資料與歷史資料合併成一份流水錶的全量資料例如:
  1. 實時統計網庫各個站點總的訪問量
  2. 實時統計網庫每個商品的總瀏覽量,交易量,交易額。
  • 視窗操作:定時對指定時間段範圍內的DStream資料進行操作,例如:
  1.  網庫主站的惡意訪問、爬蟲,每10分鐘統計30分鐘內訪問次數最多的使用者。

1.3        特性

1.3.1       優點:

  • 吞吐量大、速度快。
  • 容錯:SparkStreaming在沒有額外程式碼和配置的情況下可以恢復丟失的工作。checkpoint。
  • 社群活躍度高。生態圈強大。
  • 資料來源廣泛。

1.3.2  缺點:

  •  延遲。500毫秒已經被廣泛認為是最小批次大小,這個相對storm來說,還是大很多。所以實際場景中應注意該問題,就像標題分類場景,設定的0.5s一批次,加上處理時間,分類介面會佔用1s的響應時間。實時要求高的可選擇使用其他框架。

2   基礎概念-開發

2.1  簡單示例

2.1.1  Word count詞頻計算demo

object NetworkWordCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf()
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val lines = ssc.socketTextStream(hostname, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

2.1.2  說明

1.  通過建立輸入DStreams來定義輸入源。

2.  通過將轉換和輸出操作應用於DStream來定義流式計算。

3.  開始接收資料並使用它進行處理streamingContext.start()

4.  等待處理停止(手動或由於任何錯誤)使用streamingContext.awaitTermination()

5.  可以手動停止處理streamingContext.stop()

2.1.3  注意

1.  一旦上下文開始,就不能設定或新增新的流計算。

2.  一旦上下文停止,它將無法重新啟動。

3.  只有一個StreamingContext可以在JVM中同時處於活動狀態。

2.2  輸入源

Spark Streaming提供了兩類輸入源。

·        基本來源:StreamingContextAPI中直接提供的資源。示例:檔案系統,套接字連線。

1.檔案系統:streamingContext.fileStream(hdfsDataDirectory)

SparkStreaming將監聽目錄dataDirectory並處理在該目錄中建立的任何檔案(不支援巢狀目錄中寫入的檔案)
檔案必須具有相同的資料格式。
必須dataDirectory通過將資料原子移動重新命名為資料目錄來建立檔案。
移動後,檔案不能更改。因為,如果檔案被不斷附加,則不會讀取新的資料。

2.套接字連線:streamingContext.socketTextStream(hostname, port)

  Sparkstreaming監聽對應主機-埠,處理髮送到該埠的資料。

·       高階來源:Kafka,Flume等資源可以通過額外的實用類來獲得。

      實際應用場景中,Kafak使用較多,主要介紹Kafka的使用:

KafkaUtils.createStream(ssc, zkQuorum, groupId, topicsMap)

sscstreamingContext

zkQuorum:kafka元資料在zookeeper中的儲存地址(示例:node1:2181/kafka)

groupId:spark streaming接受kafka資料使用的使用者組id,可通過該引數控制每次接受kafka資料的索引位置,spark streaming每次啟動都會從該groupId上次接收到的資料位置開始接收。

topicsMap:Map[String, Int]型別物件,key對應接收的資料 topic名稱,value為執行緒數量。sparkstreaming接收kafka資料的啟動的執行緒數量,即併發量

 如果要在流式應用程式中並行接收多個數據流,則可以建立多個輸入DStream

2.3  DStream轉換操作

操作 含義
map(func)

通過傳遞源DStream的每個元素通過函式func返回一個新的DStream

flatMap(func) map類似,但每個輸入項可以對映到0個或更多的輸出項。
filter(func) 通過僅選擇func返回true的源DStream的記錄來返回新的DStream
repartition(numPartitions) 通過建立更多或更少的分割槽來更改此DStream中的並行級別。
union(otherStream) 返回一個新的DStream,它包含源DStream和otherDStream中元素的並集。
count() 通過計算源DStream的每個RDD中的元素數量來返回單元素RDD的新DStream
reduce(func) 通過使用函式func(它需要兩個引數並返回一個),通過聚合源DStream的每個RDD中的元素來返回單元素RDD的新DStream。該函式應該是關聯的,以便可以平行計算。
countByValue() 當呼叫型別為K的元素的DStream時,返回一個新的DStreamKLong)對,其中每個鍵的值是源DStream的每個RDD中的頻率。
reduceByKey(func,[numTasks]) 當(KV)對的DStream被呼叫時,返回(KV)對的新DStream,其中使用給定的reduce函式聚合每個鍵的值。注意:預設情況下,使用Spark的預設並行任務數(2為本地模式,群集模式中的數字由config屬性決定spark.default.parallelism)進行分組。您可以傳遞可選numTasks引數來設定不同數量的任務。
join(otherStream,[numTasks]) 當(KV)和(KW)對的兩個DStream被呼叫時,返回一個新的(K,(VW))對的DStream與每個鍵的所有元素對。
cogroup(otherStream,[numTasks]) 當呼叫(KV)和(KW)對的DStream時,返回一個新的DStreamKSeq [V]Seq [W])元組。
transform(func) 通過對源DStream的每個RDD應用RDDRDD函式來返回一個新的DStream。這可以用於對DStream進行任意RDD操作。
updateStateByKey(func) 返回一個新的“狀態”DStream,其中通過對鍵的先前狀態應用給定的功能和鍵的新值來更新每個鍵的狀態。這可以用於維護每個金鑰的任意狀態資料。

Transform操作: 

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...)
val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...)  ...
})

UpdateStateByKey 操作:

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    //產生我們需要的pair rdd
    val linerdd = lines.map{row =>{
     ···
      (key, amt)
}}

val addFunc = (currValues: Seq[Int], preValueState: Option[Int]) =>{
      //通過spark內部的reducebykey按key規約,然後這裡傳入某key當前批次的seq,再計算key的總和
      val currentCount = currValues.sum
      //已經累加的值
      val previousCount = preValueState.getOrElse(0)
      //返回累加後的結果,是一個Option[Int]型別
      Some(currentCount + previousCount)
    }
    linerdd.updateStateByKey[Int](addFunc _).print()

Windows操作

下圖說明了這個視窗。

如圖:

1. 紅色的矩形就是一個視窗,視窗hold的是一段時間內的資料流。

2.這裡面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,視窗會slide一次。

所以基於視窗的操作,需要指定2個引數:

· window length - The duration of the window (3 inthe figure)

· slide interval - The interval at which the window-basedoperation is performed (2 in the figure).  

舉個例子吧:

還是以wordcount舉例,每隔10秒,統計一下過去30秒過來的資料。

val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))  

這裡的paris就是一個DStream,每條資料類似(word,1)

一些常見的視窗操作如下。所有這些操作都採用上述兩個引數 - windowLengthslideInterval

操作

含義

windowwindowLengthslideInterval

返回基於源DStream的視窗批次計算的新DStream。

countByWindowwindowLengthslideInterval

返回流中元素的滑動視窗數。

reduceByWindowfuncwindowLengthslideInterval

返回一個新的單元素流,通過使用func在滑動間隔中通過在流中聚合元素建立。

reduceByKeyAndWindowfuncwindowLengthslideInterval,[ numTasks ])

當對(K,V)對的DStream進行呼叫時,返回(K,V)對的新DStream,其中每個鍵的值 在滑動視窗中使用給定的減少函式func進行聚合

countByValueAndWindowwindowLength, slideInterval,[numTasks ])

當呼叫(K,V)對的DStream時,返回(K,Long)對的新DStream,其中每個鍵的值是其滑動視窗內的頻率。

2.4 DStream的輸出操作

輸出操作允許將DStream的資料推送到外部系統,如資料庫或檔案系統。由於輸出操作實際上允許外部系統使用變換後的資料,所以它們觸發所有DStream變換的實際執行(類似於RDD的動作)。目前,定義了以下輸出操作:

操作

含義

print()

在執行流應用程式的驅動程式節點上的DStream中列印每批資料的前十個元素。

saveAsTextFilesprefix,[ suffix ])

將此DStream的內容另存為文字檔案。基於產生在每批間隔的檔名的字首字尾“字首TIME_IN_MS [.suffix]”

saveAsObjectFilesprefix,[ suffix ])

將DStream的內容儲存為SequenceFiles序列化的Java物件。基於產生在每批間隔的檔名的字首字尾“字首TIME_IN_MS [.suffix]”

saveAsHadoopFilesprefix,[ suffix])

將此DStream的內容另存為Hadoop檔案。基於產生在每批間隔的檔名的字首字尾字首TIME_IN_MS [.suffix]”。 

foreachRDDfunc

對從流中生成的每個RDD 應用函式func的最通用的輸出運算子。此功能應將每個RDD中的資料推送到外部系統,例如將RDD儲存到檔案,或將其通過網路寫入資料庫。請注意,函式func在執行流應用程式的驅動程式程序中執行,通常會在其中具有RDD動作,從而強制流式傳輸RDD的計算。

注意

  • DStreams通過輸出操作進行延遲執行,就像RDD由RDD操作懶惰地執行。具體來說,DStream輸出操作中的RDD動作強制處理接收到的資料。因此,如果您的應用程式沒有任何輸出操作,或者具有輸出操作,比如dstream.foreachRDD()沒有任何RDD操作,那麼任何操作都不會被執行。系統將簡單地接收資料並將其丟棄。
  • 預設情況下,輸出操作是一次一個執行的。它們按照它們在應用程式中定義的順序執行。

2.5 DataFrame和SQL操作

可以輕鬆地在流資料上使用DataFrames和SQL操作。您必須使用StreamingContext正在使用的SparkContext建立一個SparkSession。此外,必須這樣做,以便可以在驅動程式故障時重新啟動。

這在下面的示例中顯示。將每個RDD轉換為DataFrame,註冊為臨時表,然後使用SQL進行查詢。

val words: DStream[String] = ...
words.foreachRDD { rdd =>
  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

2.6  MLlib操作

可以通過訓練出個模型,然後將模型作為廣播變數,在DStream操作中使用該模型預測相關資料。

2.7 快取/永續性

與RDD類似,DStreams還允許開發人員將流的資料保留在記憶體中。也就是說,使用persist()DStream上的方法將自動將該DStream的每個RDD保留在記憶體中。如果DStream中的資料將被多次計算(例如,相同資料上的多個操作),這是非常有用的。對於基於視窗的操作,像reduceByWindowreduceByKeyAndWindow和基於狀態的操作一樣updateStateByKey,這是隱含的。因此,基於視窗的操作生成的DStreams將自動保留在記憶體中,無需開發人員的呼叫persist()

2.8  CheckPoint/檢查點

流式應用程式必須全天候執行,因此必須能夠適應與應用程式邏輯無關的故障(例如,系統故障,JVM崩潰等)。為了可以這樣做,Spark Streaming需要檢查足夠的資訊到容錯儲存系統,以便可以從故障中恢復。

2.8.1  如何使用CheckPoint

啟用 checkpoint,需要設定一個支援容錯的、可靠的檔案系統(如 HDFS、s3 等)目錄來儲存 checkpoint 資料。通過呼叫 streamingContext.checkpoint(checkpointDirectory)來完成。另外,如果你想讓你的application能從 driver 失敗中恢復,你的application 要滿足:

·        若 application為首次重啟,將建立一個新的 StreamContext 例項

·        如果 application是從失敗中重啟,將會從 checkpoint 目錄匯入 checkpoint 資料來重新建立 StreamingContext 例項

通過 StreamingContext.getOrCreate可以達到目的:

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

```
context.start()
context.awaitTermination()

2.9  Spark Streaming程式提交

與spark提交方式一樣

spark-submit

--class:您的應用程式的入口點(例如org.apache.spark.examples.SparkPi

--master:叢集的主URL(例如spark://10.2.9.114:7077;提交到yarn叢集寫:yarn

--deploy-mode:是否將驅動程式部署在工作節點(cluster)或本地作為外部客戶端(client)(預設值:client

--conf:任意Spark配置屬性,key = value格式。對於包含空格的值,用引號括起“key = value”(如圖所示)。

application-jar:包含應用程式和所有依賴關係的捆綁jar的路徑。該URL必須在叢集內全域性可見,例如所有節點上存在的hdfs://路徑或file://路徑。

application-arguments:引數傳遞給主類的main方法,如果有的話。

提交到Yarn叢集的特殊引數:

--executor-memory    每個executor記憶體大小

--num-executors     executor數量

--executor-cores     executor cpu數量

3   效能調優                                       

3.1  合理設定批處理                                           

1.    通過有效利用叢集資源減少每批資料的處理時間。

2.    設定正確的批量大小,使得批量的資料可以像接收到的那樣快速處理(即資料處理與資料攝取保持一致)。

3.2  資料接收中的並行級別

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

3.3  記憶體優化

3.3.1  資料序列化

可以通過調優序列化格式來減少資料序列化的開銷

3.3.2  物件型別

例如HashMapLinkedList等一些結構佔用空間較大,可考慮優化使用物件型別。

3.4  廣播大變數

使用廣播功能可以大大減少群集上啟動作業的成本。

4   FAQ

4.1  executor間task分佈不均勻

task大都集中在特定的少數executor上執行,並行度不夠。

原因:

這些點為receiver所在節點。Receiver會將接收到的資料的第一個副本放在本地,另外的副本隨機分佈在其他節點。黨我們只設置一個副本時(e.g. MEMORY_ONLY_SER),資料會全部集中在receiver所在的幾個節點,task也會被優先分發到這些點上的executor中執行。

4.2  spark Streaming任務失敗

原因:Spark Streaming存在執行一定時間後失敗的問題

解決辦法:定時重啟Spark Streaming任務

未完待續···