1. 程式人生 > >spark-streaming學習筆記總結

spark-streaming學習筆記總結

基本介紹
Spark建立流式應用的本質,還是依賴了spark最核心的那些技術,只是在這些技術上又封裝了一層流式介面。
Spark的streaming機制簡單來說,就是將連續的時間序列切割成不同的離散時間段。針對某個時間段,將該時間段內的所有輸入資料組成一個RDD,接下來的工作就如同一個傳統的sprark應用一樣,對這個RDD進行各種變換,直到最終輸出資料。可以認為,Spark Streaming就是在時間維度上,為每個時間段都建立了同一個spark應用,這樣表面上看起來就像是流式的工作方式。如下圖所示。
其中每個方框都是一個RDD;一個橫排的所有RDD組成一個DStream;一個縱列的所有RDD是在某一個時刻的流式作業。DStream是在Spark Streaming程式設計中的一個基本資料單位(執行變換、輸出等操作),就像是Spark程式設計中的RDD一樣。
 
API的使用示例
在進行Streaming程式設計前,需要依賴spark-streaming_*.jar包,這個jar包已經包含在spark-assembly-*.jar裡了,所以如果之前已經加入了這個包的依賴,不需要額外的工作。另外,如果流式的資料來源是其他系統,如kafka、flume等,需要加入這些jar包依賴,各個jar包名如下所示,可以通過各方途徑得到。
Kafka        spark-streaming-kafka_2.10
Flume       spark-streaming-flume_2.10
Kinesis      spark-streaming-kinesis-asl_2.10[Amazon Software License]
Twitter     spark-streaming-twitter_2.10
ZeroMQ   spark-streaming-zeromq_2.10
MQTT       spark-streaming-mqtt_2.10
一個基本的流式應用如下所示。
importorg.apache.spark._
importorg.apache.spark.streaming._
importorg.apache.spark.streaming.StreamingContext._
 
// Create a localStreamingContext with two working thread and batch interval of 1 second.
// The masterrequires 2 cores to prevent from a starvation scenario.
 
val conf = newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = newStreamingContext(conf, Seconds(1))
val lines =ssc.socketTextStream("localhost", 9999)
val words =lines.flatMap(_.split(" "))
val pairs =words.map(word => (word, 1))
val wordCounts =pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
 
         可以看出,除了最後的ssc.start()和ssc.awaitTermination(),其餘部分和傳統spark程式設計向乎一模一樣。
StreamingContext
就像SparkContext是spark應用的入口一樣,StreamingContext是流式應用的入口。實際上,StreamingContext本身已經包含了SparkContext。初始化StreamingContext的方式:
newStreamingContext(conf, Seconds(1))
newStreamingContext(new SparkContext(…), Seconds(1))
         conf是一個SparkConf物件,這個物件的初始化已經在上文中介紹。其中Seconds(1)代表輸入的資料每一秒鐘打一個batch包。一個batch包內可以有多個block,一個block對應一個task。這個將在以後解釋。這個batch間隔可以理解成是流式中每個應用的執行週期。
Receiver
Receiver可分為以下幾類:
原生型別(fileStream、akka的actorStream、用於測試的由RDD佇列組成的queueStream),原生型別可以直接使用,不需要依賴其他包。
高階型別(Twitte、Flume、Kafak、Kinesis),這種型別需要依賴相應jar包。
使用者自定義型別,不討論。
 
對於每種輸入資料的DStream,都有一個Receiver物件與之相關聯,對於每個Receiver物件,又有一個Source與之相對應。每一個Receiver物件代表一個數據接收端例項(即只有一個executor使用一個core來接收資料,併發度為1),如果要提高併發度,可以通過建立多個Receiver物件來實現,方法如下。
val numStreams = 5
val kafkaStreams =(1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream =streamingContext.union(kafkaStreams)
unifiedStream.print()
         其中,streamingContext.union()把多個Receiver的資料合併。
給應用分配的cores數量必須大於Receiver數目,這樣才能保證每個Receiver佔用一個core的同時,至少還有另一個cores來處理資料。無論master是local的還是叢集的,都應該這樣配置,否則資料將得不到執行緒資源來處理。
DStream的變換
檢視相應api文件,很多和RDD的變換很像。其中著重介紹三種特殊的transform(func)、updateStateByKey(func)和window Operation。
1.      transform
transform(func)需要傳入函式型別是RDDto RDD,這個函式可以由使用者自己定義,目的是實現API所沒有提供的複雜變換。
2.      updateStateByKey
updateStateByKey(func)常用於有狀態的應用,它的基本功能是用當前的RDD資料,更新之前記錄下的狀態(狀態也是個RDD?),並將這個新的狀態記錄下來,用於下次一更新操作。一個示例如下:
defupdateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = … //將newValues累加到runningCount上,形成新的newCount
Some(newCount)
}
val runningCounts =pairs.updateStateByKey[Int](updateFunction _)
         以上程式碼實現了將每次的word記數累加到狀態上,並記錄下來。
3.      Window Operation
Window Operation簡單來說,就是在某個時間段,計算之前一定數量時間段內的所有RDD。如下圖所示。
         上圖中,從originalDStream到windowed DStream的變換,就是window Operation。這個變換分別在時間點3、5…計算之前3個時間段內的3個RDD,產生一個新的RDD。一個window Operation的示例如下所示。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int)=> (a + b), Seconds(30), Seconds(10))
         reduceByKeyAndWindow函式第一個引數是變換函式,第二個引數代表每次計算擷取該時間間隔內的所有RDD,第三個引數代表計算週期。即,每隔10秒鐘,計算前30秒內的每個key值對應資料之和。
         所有的windowOperation操作見API文件。
DStream輸出操作
DStream的輸出操作包括:print()、saveAsTextFiles(prefix,[suffix])、saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix,[suffix])、foreachRDD(func),具體說明見API文件。
其中,除foreachRDD(func)之外的所以操作都是直接將資料存成檔案,而foreachRDD(func)的作用是將資料網路傳輸到外部系統。其中func是RDD to Unit型別的函式。一種錯誤的使用方法是:
dstream.foreachRDD { rdd =>
valconnection = createNewConnection()  //executed at the driver
rdd.foreach{ record =>
connection.send(record) // executed at the worker
}
}
按道理,rdd =>{}這個函式應該在每個worker上呼叫createNewConnection()來建立網路連線,而實際上這個操作是在driver端完成的,因此有問題。解決辦法是將createNewConnection()呼叫挪到record =>{}操作中來實現,這又會導致每條record建立一個連線,效率太低,解決辦法是使用RDD的foreachPartition{}函式,這個函式將引數內的函式操作放到每個worker上執行,如下所示。
dstream.foreachRDD { rdd =>
rdd.foreachPartition{ partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool ofconnections
val connection = ConnectionPool.getConnection()
     partitionOfRecords.foreach(record=> connection.send(record))
ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
}
}
         其中ConnectionPool使用連線池,這樣在流式應用的每個時間段內,不用建立連線,而且直接從連線池中取用,進一步提高了效率。
 
start()和awaitTermination()
streamingContext類的start()函式開始這個流式應用的執行,開始執行後,start()函式返回。呼叫awaitTermination(),driver將阻塞在這裡,直到流式應用意外退出。另外,通過呼叫stop()函式可以優雅退出流式應用,通過將傳入的stopSparkContext引數設定為false,可以只停止StreamingContext而不停止SparkContext(目前不知道這樣做的目的)。流式應用退出後,不可以通過呼叫start()函式再次啟動。
 
持久化操作
基於視窗的DStream操作、以及有狀態的變換(如上所述的updateStateByKey(func)),系統自動地把資料持久化到記憶體中,即隱含地呼叫了persist()操作。
eConteCheckpointing
Spark Streaming支援對兩種資料做checkpoint:
1.      元資料,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元資料被儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對driver失敗後的修復。
2.      流式資料,也是儲存到容忍失敗的儲存系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史資料都重新計算一遍(以便得到當前的狀態)。
 
如果要使用checkpoint機制,有幾個工作需要完成。
1.      修改之前的程式碼,如下所示。
// Function tocreate and setup a new StreamingContext
deffunctionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)   // new context
val lines = ssc.socketTextStream(...) //create DStreams
lines.checkpoint(…)                             //設定DStream做checkpoint的週期。
...
ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
ssc
}
 
// GetStreamingContext from checkpoint data or create a new one
val context =StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additionalsetup on context that needs to be done,
// irrespective ofwhether it is being started or restarted
context. ...
 
// Start thecontext
context.start()
context.awaitTermination()
         其中比較關鍵的操作是:使用StreamingContext.getOrCreate來建立StreamingContext物件,傳入的第一個引數是checkpoint的存放目錄,第二引數是生成StreamingContext物件的使用者自定義函式。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext物件;如果不存在,才會呼叫第二個函式來生成新的StreamingContext物件。
         在functionToCreateContext函式中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然後呼叫ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最後再返回StreamingContext物件。
         這樣,在StreamingContext.getOrCreate之後,就可以直接呼叫start()函式來啟動(或者是從中斷點繼續執行)流式應用了。如果有其他在啟動或繼續執行都要做的工作,可以在start()呼叫前執行。
 
2.      設定流式資料checkpoint的週期
         對於一個需要做checkpoint的DStream結構,可以通過呼叫DStream.checkpoint(checkpointInterval)來設定ckeckpoint的週期,經驗上一般將這個checkpoint週期設定成batch週期的5至10倍。
 
3.      確保driver可以自動重啟
重啟driver的方法已經在應用提交中提及。
 
4.      使用write ahead logs功能
這是一個可選功能,建議加上。在輸入RDD的Receiver中開啟這個功能將使得輸入資料寫入之前配置的checkpoint目錄。這樣有狀態的資料可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設定為true。
另外,由於輸入RDD的預設StorageLevel是MEMORY_AND_DISK_2,即資料會在兩臺worker上做replication。實際上,Spark Streaming模式下,任何從網路輸入資料的Receiver(如kafka、flume、socket)都會在兩臺機器上做資料備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在建立RDD時由引數傳入。
 
補充一點,使用以上的checkpoint機制,確實可以保證資料0丟失。但是一個前提條件是,資料傳送端必須要有快取功能,這樣才能保證在spark應用重啟期間,資料傳送端不會因為spark streaming服務不可用而把資料丟棄。
效能調優
對spark應用做效能調優主要包括以下幾個方面
1.      提高Receiver的併發度,具體的方法在上文中已經提及。
2.      調整Receiver的RDD資料分割槽時間隔
batch interval是接收一個batch資料的時間;而blockinterval則是接收一個block資料的時間。一個block的資料對應一個task。所以一個receiver接收一個batch資料後,執行簡單RDD操作需要的task數是:(batch interval/block interval)。所以減少block interval可以提高task數,即提高了併發度。
block interval通過spark.streaming.blockInterval這個property進行配置。最小值是50ms。通過ReceiverInputDStream.Repartition()也可以配置固定數目的分割槽。
3.      調整資料處理的併發度
reduceByKey 型別的操作,結果RDD的分割槽數可以通過引數傳入。否則其結果RDD分割槽數由spark.default.parallelism來決定。這個property的預設值是父RDD的分割槽數。
4.      調整資料序列化方式
5.      調整batch interval
Spark實現流式計算的方式,其實相當於把資料分割成很小的時間段,在每個小時間段內做Spark批量計算。所以,這個時間段的大小直接決定了流式系統的效能。如果設定的太大,時效性不好,如果設定的太小,很可能計算數率趕不上資料流入的速度。
一般決定合適的batch interval的方式是:先用較大的batch interval和較低的資料量執行流式應用,從web UI上觀察資料平均end-to-end時延,如果平穩且較低,則可以逐步減小batch interval或增大資料量,直至end-to-end時延平穩、小於batch interval並在一個等級上。這個時候一般是合適的batch interval。
6.      記憶體調優
DStream(非網路input stream)的預設StorageLevel是MEMORY_ONLY_SER,RDD的預設 StorageLevel是MEMORY_ONLY 。(待續)
Spark Streaming的資料可靠性
上文說過的ckeckpoint機制、write ahead log機制、Receiver快取機器、可靠的Receiver(即資料接收並備份成功後會傳送ackownage),可以保證無論是worker失效還是driver失效,都是資料0丟失。原因是:如果沒有Receiver服務的worker失效了,RDD資料可以依賴血統來重新計算;如果Receiver所在worker失敗了,由於Reciever是可靠的,並有write ahead log機制,則收到的資料可以保證不丟;如果driver失敗了,同理。