spark解決方案系列--------1.spark-streaming實時Join儲存在HDFS大量資料的解決方案
spark-streaming實時接收資料並處理。一個非常廣泛的需求是spark-streaming實時接收的資料需要跟儲存在HDFS上的大量資料進行Join。要實現這個需求保證實時性需要解決以下幾個問題:
1.spark-streaming的資料接收間隔往往很小,比如只有幾秒鐘。HDFS上的資料往往很大,不能每個batch都從HDFS讀取資料,避免頻繁大量磁碟I/O。HDFS的資料也可能會改變,只是改變後資料載入週期比spark-streaming的batch時間要長。
2.Cache到記憶體的資料,不能在streaming的一個batch處理結束之後被回收。
3.HDFS大量資料在跟Kafka等實時接收的資料進行Join的時候不能shuffle。如果發生shuffle,由於HDFS中資料非常大,勢必會影響實時性。
4.HDFS載入到記憶體中的資料,不能頻繁Checkpoint到磁碟。
一個Spark-streaming application可以建立多個InputDStream,但是所有的InputDStream的資料接收時間間隔必須相同,因為資料接收間隔設定在了StreamingContext上。因此為了解決上面提到的第一個問題,需要實現一個自定義InputDStream。這個InputDStream需要將從HDFS上讀取的資料Cache到記憶體,並且將Cache到記憶體的資料從前一個DStream傳遞到下一個DStream。
Spark-streaming在每個batch建立了RDD之後,如果DStream的StorageLevel不為None會設定DStream的RDD的StorageLevel,原始碼如下:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) {//設定DStream對應RDD的儲存級別 newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
因此設定自定義InputDStream的StorageLevel為MEMORY_ONLY或者MEMORY_ONLY_SER可以解決HDFS資料cache到記憶體的問題。相關原始碼為:
storageLevel = StorageLevel.MEMORY_ONLY
一個Spark-streaming batch處理結束之後,會發送ClearMetadata事件來清除這個batch的資料,具體原始碼如下:
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))//清除remeberDuration時間間隔之前的RDD
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo("Removing blocks of RDD " + b + " of time " + time)
b.removeBlocks()//將RDD佔用的記憶體塊從BlockManager釋放
case _ =>
}
}
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
}
從上面的程式碼可知,當一個batch處理結束之後,會將remeberDuration時間間隔之前的RDD刪除,並且將這個RDD在BlockManager中佔用的記憶體塊釋放。所以通過設定自定義InputDStream的remeberDuration來防止一個batch產生的RDD馬上被釋放。
Spark-streaming每個batch都會產生RDD,為了避免每個Batch都從HDFS載入檔案產生RDD,需要將HDFS中大檔案產生的RDD在前後的DStream進行傳遞,因此在自定義InputDStream中定義一個成員變數,記錄已經生成的RDD,並且這個成員變數只有在rememberDuration時間間隔才會發生改變。自定義InputDStream的compute方法定義為:
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
// Find new files
if (rddGenerateTime == 0 || Duration(validTime.milliseconds - rddGenerateTime) >= durationToRemember){//從HDFS讀取資料生成的RDD的週期為durationToRemember
val newFiles = findNewFiles(validTime.milliseconds)//這個方法會載入目錄裡面的所有檔案
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
val rdds = Some(filesToRDD(newFiles))
rddGenerateTime = validTime.milliseconds//記錄從HDFS讀取資料的時間
val metadata = Map(
"files" -> newFiles.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
val inputInfo = StreamInputInfo(id, 0, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
rememberRDDs = rdds//記錄從HDFS讀取資料生成的RDD
rdds
}else {
/*
這樣在DStream.getOrCompute方法裡面會把這個RDD重複的放入generateRDDs這個HashMap裡面.不過沒有關係,因為generateRDDs裡面的RDD
在超過rememberDuration之後會將RDD從generateRDDs裡面刪除,並且將RDD所佔用的記憶體塊也刪除。在實際執行的時候不會發生重複刪除記憶體塊,因為刪除記憶體塊之前會先檢查記憶體塊是否存在
第一次刪除之後,
以後再刪除的時候發現記憶體塊已經不存在了,會直接返回,不會重複刪除
*/
//useRememberRDDTime = validTime.milliseconds
rememberRDDs
}
}
在上面的方法中,從HDFS讀取資料生成的RDD的週期為durationToRemember,並且從HDFS讀取資料生成的RDD儲存在了自定義InputDStream的remeberRDDs成員。如果時間還沒有達到durationToRemember則將remeberRDDs中儲存的RDD作為這個batch產生的HadoopRDD,這樣達到了RDD在前後DStream的傳遞,避免了頻繁的讀取HDFS資料。另外這個自定義DStream是仿照FileInputDStream類寫的,它借鑑了FileInputDStream.compute方法對目錄檔案的監控,在一個durationToRemember週期載入一次監控目錄的所有檔案產生RDD。
最終的實現效果如下圖所示:
在上圖中,spark-streaming DStream的產生batch是2秒,但是NewHadoopRDD 1天才生成一次,並且Cache到了記憶體。在1天的remeberDuration時間間隔內不同的MyInputDSteam使用相同的NewHadoopRDD.
為了避免進行join的時候發生shuffle,將Kafka DStream每個batch的資料broadcast,進行join。測試程式碼如下:
kafkaMessages.transformWith[(String, String), (String, String, String)](hadoopMessages, (kafkaRDD:RDD[(String, String)], fileRDD:RDD[(String, String)]) =>{
val sqlContext = SQLContextSingleton.getInstance(kafkaRDD.sparkContext)
import sqlContext.implicits._
val kafkaDf = kafkaRDD.map{case (ip, str) =>KafkaObj(ip, str)}.toDF
//kafkaDf.registerTempTable("kafkatable")
val fileDf = fileRDD.map{case (ip, str) =>FileObj(ip, str)}.toDF
fileDf.join(broadcast(kafkaDf), "ip").rdd.map(row =>{
(row.get(0).toString, row.get(1).toString, row.get(2).toString)
})
})
自定義InputDStream的filesToRDD 方法將所有檔案轉化成RDD,原始碼如下:
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map { file =>
val rdd = serializableConfOpt.map(_.value) match {
case Some(config) => {
context.sparkContext.newAPIHadoopFile(
file,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
config)
.map { case (k, v) =>
val newKey = new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
(newKey, newText)//因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體
}
}
case None => {
context.sparkContext.newAPIHadoopFile[K, V, F](file)
.map{case (k, v) =>
val newKey =new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
(newKey, newText)<span style="font-family: Arial, Helvetica, sans-serif;">//因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體</span>
}
}
}
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
rdd
}
new UnionRDD(context.sparkContext, fileRDDs)
}
因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體
從上面的方法知道,filesToRDD返回的RDD並不是根RDD(根據NewHadoopRDD建立了UnionRDD),這個RDD也是自定義InputDStream的compute方法返回的RDD。
spark-streaming的DSteam.getOrCompute方法將compute返回的RDD checkpoint到HDFS,原始碼如下:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()//將DStream的compute方法產生的RDD checkpoint到HDFS
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
如果設定了checkpoint,則將DStream的compute方法產生的RDD checkpoint到HDFS。
這個UnionRDD cache到了記憶體,但是如果發生checkpoint,會把這個UnionRDD寫到磁碟,這樣也會引起大量的磁碟I/O,為了解決這個問題,這個解決方案不能設定checkpoint。如果需要checkpoint,需要將KafkaDirectDStream每個Kafka分割槽的偏移量同步到zookeeper,每次程式重啟的時候從zookeeper獲取KafkaDirectDStream上一次讀取到的位置。