Spark Streaming(二十七)DStream的轉換、輸出、快取持久化、檢查點
定義
所謂DStream
的轉換其實就是對間隔時間內DStream
資料流的RDD
進行轉換操作並返回去一個新的DStream
。
DStream轉換
其實DStream
轉換語法跟RDD
的轉換語法非常類似,但DStream
有它自己的一些特殊的語法,如updateStateByKey()、transform()
、以及各種Window
語法。
轉換 | 意思 |
---|---|
map(func) | 將DStream上的每個RDD通過func函式操作並返回一個新的DStream |
flatMap(func) | 和map類似,但是每個輸入的元素可以對映成0個或者多個項 |
filter(func) | 過濾出DStream上符合要求的RDD並返回新的DStream |
repartiton(numPartitions) | 對DStream上的RDD進行重新分割槽,提高並行度 |
union(otherDStream) | 將兩個DStream合併到一起 |
count() | 返回DStream中RDD中元素的總個數,返回只包含一個Long型別的DStream |
reduce(func) | 通過func函式聚合DStream中每個RDD中的每個元素,返回新的DStream,該新的DStream中只有一個元素,就是聚合以後的而結果 |
countByValue() | 計算DStream上RDD內元素出現的頻次,並返回新的DStream[(K,Long)],K是RDD元素的型別,Long是元素出現的次數 |
reduceByKey(func,[numPartition]) | 聚合DStream上(K,V)型別的RDD裡元素的,根據K統計V的個數,返回新的DStream,新的DStream裡的RDD元素型別也為(K,V),K為鍵,V為K對應值的個數 |
join(otherDStream,[numPartition]) | 將兩個型別為(K,V)和(K,W)的DStream進行join連線,返回一個型別為(K,(V,W))的新的DStream |
cogroup(otherDStream,[numPartition]) | 對兩個(K,V)和(K,W)型別的DStream上呼叫該函式的時候,返回(K,(Seq [V],Seq [W]))元組的新DStream。 |
transform(func) | 通過轉換函式,將DStream上的每個RDD轉換成另一種RDD,這種函式的操作基本單位為RDD,所以這個函式中的操作語法就是RDD的操作語法。轉換後返回新的DStream |
UpdateStateByKey操作
updateStateByKey
就是隨著時間的流逝,在SparkStreaming
中的可以對每一個Key
通過checkPoint
來維護state
狀態,通過更新函式對每一個Key
的狀態進行更新,在更新的時候,對於每一個批次的資料而言,SparkStreaming
都會通過updateStateByKey
這個函式更新已存在Key對應的State
,但是如果通過更新函式對State
更新以後返回的是None
,那麼此刻的Key
對應的State
就會被刪除,State
可以是任意型別的資料結構。
package com.lyz.streaming.transformation
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streaming = new StreamingContext(conf, Seconds(5))
/**
*
* 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
* 都需要去檢查點獲取上一次更新後的state
*/
streaming.checkpoint("hdfs://xxxxx:8020/spark/checkpoint")
val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
/**
* 定義更新函式體
* value:就是新一批次的資料經過邏輯處理得到的結果
* res:新一批次的資料進來之前的狀態值
*/
def func(value: Seq[Int], res: Option[Int]): Some[Int] = {
val sum: Int = value.sum
val per: Int = res.getOrElse(0)
Some(sum + per)
}
//呼叫更新函式,傳入更新函式體
val res: DStream[(String, Int)] = mapStream.updateStateByKey(func)
res.foreachRDD(rdd => {
rdd.collect().foreach(println(_))
})
streaming.start()
streaming.awaitTermination()
}
}
Transform轉換操作
transform
操作允許DStream
上間隔時間內裡的RDD
轉換成另個RDD
。它的作用就是應用Dstream
為公開的RDD
操作函式。例如DStream
上一個新的批次RDD
與已存在的RDD
進行join
連線查詢的API
是未公開的,所以是不能這麼使用的,但是利用transform
函式就可以使用DStream
未公開API
。
package com.lyz.streaming.transformation
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object TransformTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streaming = new StreamingContext(conf, Seconds(5))
/**
*
* 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
* 都需要去檢查點獲取上一次更新後的state
*/
streaming.checkpoint("hdfs://xxxx:8020/spark/checkpoint")
val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
//外部的RDD
val rdd: RDD[(String, Int)] = streaming.sparkContext.makeRDD(Array(("aaaa", 1)))
/**
* 呼叫DStream的transform函式,將DStream裡的RDD與外部RDD進行操作,返回新的DStream
* transform強大的原因就是能夠使用DStream為公開的一些RDD操作函式
*/
val res: DStream[(String, (Int, Int))] = mapStream.transform(rdd1 => {
rdd1.join(rdd)
})
res.foreachRDD(rdd => {
rdd.collect().foreach(println(_))
})
streaming.start()
streaming.awaitTermination()
}
}
Window視窗操作
SparkStreaming
提供了基於視窗的操作,你可以在滑動視窗內對資料進行轉換。這種視窗操作就是在比SparkStreaming
批處理的時間間隔更長的時間間隔內整合多個批處理的結果。
Window
視窗操作需要兩個引數,一個引數是視窗的時長(最近的幾個批次),另一個是視窗的滑動步長(多久執行一次計算)。需要特別注意的是這兩個引數一定要是批處理間隔時間的整數倍。例如我們有一個以10
秒為間隔的批處理,我們想要沒十秒計算一次30
秒內的資料,那麼我們就可以把時長視窗設定成30
秒,視窗的滑動步長為10
秒。
package com.lyz.streaming.transformation
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object WindowTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streaming = new StreamingContext(conf, Seconds(5))
val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
/**
* 每十秒計算一次前三十秒的資料,其實也就是前6個批次的資料
*/
val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((w: Int, w1: Int) => w + w1, Seconds(30), Seconds(10))
}
}
比較常用的Window視窗函式
函式 | 解釋 |
---|---|
window(windowLength,slideInterval) | 建立滑動視窗,並返回一個新的DStream然後自定義函式處理這個滑動視窗的資料 |
reduceByKeyWindow(func,windowLength.slideInterval) | 對每個滑動視窗執行reduceByKey的操作 |
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numParition]) | 這是一個性能更好的reduceByKeyAndWindow函式, |
countByWindow(windowLength,slideInterval) | 對每一個滑動視窗執行count操作,返回每個窗口裡的元素個數 |
countByValueAndWindow(windowLength,slideInterval,[numPatition]) | 在每個滑動視窗內計算reduceByValue操作,統計每個鍵對應的值在視窗內出現的頻次 |
DStream的操作結果輸出
操作DStream的輸出結果可以推送到外部資料庫中或者外的檔案系統中。
函式 | 解釋 |
---|---|
print() | 在Driver端列印DStream上的前十個元素 |
saveAsTextFiles(prefix,[suffix]) | 儲存DStream內容到文字檔案中,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]" |
saveAsObjectFiles(prefix,[suffix]) | 儲存DStream內容為Sequence檔案,這檔案是Java物件序列化後的,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]" |
saveAsHadoopFiles(prefix,[suffix]) | 儲存DStream內容到Hadoop檔案系統中,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]" |
foreachRDD(func) | 迴圈DStream上的RDD,並且將函式應用到每個RDD上,並且這個函式可以將RDD推送到外部系統中,注意這個函式時在Driver端執行的。它內部的RDD的操作是在每個Woker分割槽上執行的 |
foreachRDD應用設計
foreachRDD
幾首一個函式,通過這個函式我可以把RDD
儲存到外部系統中,既然要儲存到外部系統就需要與外部系統建立連線,我們都知道RDD
的資料是散落在各個woker
上的,處理RDD
的函式也是在各個worker
上進行的,我們需要將資料儲存到外部資料,那麼我們就必須在每個worker
上都存在一個連線,因為連線是不能被序列化的,所以這個連線肯定是不能由driver
傳送到worker
上的,而是在worker
上創建出來。那麼怎麼樣才能在worker
上建立連線呢?那就是在rdd.foreach
程式碼塊裡建立連線,而不是在stream.foreachRDD
程式碼塊裡建立。
res.foreachRDD(rdd => {
rdd.foreach(r => {
//1、建立連線
//2、儲存資料
//3、關閉連線
})
})
我們都知道遍歷RDD的時候其實就是在遍歷每一條記錄,如上程式碼所示我們為每一條記錄都建立了連線,顯然這樣是非常影響系統性能的,那麼我們怎麼辦呢?既然資料是在各個worker的分割槽上的,那麼我們可以為每個分割槽建立一個連線。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
DataFrame和SQL操作DStream
在處理DStream資料流的時候,我們可以利用DataFrame和SQL操作DStream。利用DataFrame和SQL處理DStream的前提是必須利用初始化Streaming的SparkConf來建立SparkSession。例子如下
package com.lyz.streaming.transformation
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object DateFrameAndSqlTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streaming = new StreamingContext(conf, Seconds(5))
/**
*
* 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
* 都需要去檢查點獲取上一次更新後的state
*/
streaming.checkpoint("hdfs://192.168.101.187:8020/spark/checkpoint")
val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
mapStream.foreachRDD(rdd => {
//利用SparkConf來初始化SparkSession。
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//匯入隱式轉換來將RDD
import sparkSession.implicits._
//將RDD轉換成DF
val df: DataFrame = rdd.toDF("word", "count")
df.createOrReplaceTempView("compute")
val computeDF: DataFrame = sparkSession.sql("select * from compute")
computeDF.show()
})
streaming.start()
streaming.awaitTermination()
}
}
DStream快取和持久化
與RDD快取和持久化類似,我們在程式中可以把程式中將要被多次計算的DStream快取到記憶體中,也就是對相同資料進行多次計算。呼叫DStream的persist方法來實現DStream的快取。對於視窗函式的操作這些底層已經對資料進行快取了,因此視窗函式生成的DStream已經儲存在了記憶體中,開發人員無需呼叫persist方法。
對於通過網路傳輸的的資料流,預設的持久化是將資料複製到兩個節點上進行備份實現容錯
注意:與RDD不同,DStream的預設持久化級別是將資料持久化儲存在記憶體中。
DStream的檢查點
由於流式處理程式需要全天不間斷的執行,執行期間有很大機率會出現程式故障,為了應用程式的故障容錯性,Streaming設定了檢查點功能,將足夠的資訊儲存到檢查點中,以便對系統故障進行恢復。Streaming可以對兩種資料進行checkPoint。
- 元資料檢查點:將定義流式計算資訊儲存到檢查點上(HDFS目錄),用於恢復應用程式的驅動程式的故障。其中應用程式的元資料資訊包含建立流式應用的配置、定義流式應用程式的操作集、部分未完成的批次
- 資料檢查點:將生成的RDD儲存到檢查點上 。在應用程式中跨多個批次的RDD的具有有狀態的轉換中,這種檢查點必須設定,例如視窗操作和有狀態更新。因為在多個批次組合生成一個RDD的時候,結果RDD會依賴先前批次的RDD,這樣如果批次都的話,就會形成很多的依賴關係,設定RDD檢查點就是為了切斷依賴關係。
合適設定檢查點
有狀態操作的轉換,如果在應用程式中只用了例如updateStateByKey侯喆是reduceByKeyAndWindow,就必須要設定RDD的檢查點。
恢復故障的應用驅動程式 ,應用程式的元資料檢查點是儲存應用程式當前進度資訊的,所以驅動程式故障恢復就是恢復當前應用程式執行的進度。
如何配置檢查點
- 配置資料檢查點,在執行DStream有狀態的操作的時候,必須設定檢查點,設定檢查點只需要呼叫StreamingContext.checkPoint(“hdfs://hadoop001:8020/sparkStreaming/checkPoint”)並傳入HDFS上的目錄,這樣就可以將上次執行的結果RDD儲存到該目錄下,供下個批次與該資料進行整合。
- 應用元資料資訊檢查點。在應用程式第一次啟動的時候,會建立SparkContext,並呼叫start和awaitTermination方法開始流失資料的處理,當應用程式啟動失敗的時候,驅動程式會從檢查點上恢復SparkContext。
package com.lyz.streaming.checkpoint
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
object MetaStoreCheckPointTest {
def main(args: Array[String]): Unit = {
//設定檢查點目錄
val checkPointDir = "hdfs://192.168.101.187:8020/spark/checkpoint"
//從檢查點上得到StreamingContext,如果檢查點上沒有StreamingContext那麼就建立一個新的StreamingContext。
val streaming: StreamingContext = StreamingContext.getOrCreate(checkPointDir, () => createContext(checkPointDir))
streaming.start()
streaming.awaitTermination()
}
def createContext(checkPointDir: String): StreamingContext = {
//建立SparkConf
val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//建立StreamingContext
val streaming = new StreamingContext(conf, Seconds(5))
/**
*
* 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
* 都需要去檢查點獲取上一次更新後的state
*/
streaming.checkpoint(checkPointDir)
val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
res.foreachRDD(_.foreach(println(_)))
streaming
}
}