1. 程式人生 > >spark解決方案系列--------1.spark-streaming實時Join儲存在HDFS大量資料的解決方案

spark解決方案系列--------1.spark-streaming實時Join儲存在HDFS大量資料的解決方案

    spark-streaming實時接收資料並處理。一個非常廣泛的需求是spark-streaming實時接收的資料需要跟儲存在HDFS上的大量資料進行Join。要實現這個需求保證實時性需要解決以下幾個問題:

1.spark-streaming的資料接收間隔往往很小,比如只有幾秒鐘。HDFS上的資料往往很大,不能每個batch都從HDFS讀取資料,避免頻繁大量磁碟I/O。HDFS的資料也可能會改變,只是改變後資料載入週期比spark-streaming的batch時間要長。

2.Cache到記憶體的資料,不能在streaming的一個batch處理結束之後被回收。

3.HDFS大量資料在跟Kafka等實時接收的資料進行Join的時候不能shuffle。如果發生shuffle,由於HDFS中資料非常大,勢必會影響實時性。

4.HDFS載入到記憶體中的資料,不能頻繁Checkpoint到磁碟。

    一個Spark-streaming application可以建立多個InputDStream,但是所有的InputDStream的資料接收時間間隔必須相同,因為資料接收間隔設定在了StreamingContext上。因此為了解決上面提到的第一個問題,需要實現一個自定義InputDStream。這個InputDStream需要將從HDFS上讀取的資料Cache到記憶體,並且將Cache到記憶體的資料從前一個DStream傳遞到下一個DStream。

    Spark-streaming在每個batch建立了RDD之後,如果DStream的StorageLevel不為None會設定DStream的RDD的StorageLevel,原始碼如下:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {//設定DStream對應RDD的儲存級別
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

因此設定自定義InputDStream的StorageLevel為MEMORY_ONLY或者MEMORY_ONLY_SER可以解決HDFS資料cache到記憶體的問題。相關原始碼為:

storageLevel = StorageLevel.MEMORY_ONLY


    一個Spark-streaming batch處理結束之後,會發送ClearMetadata事件來清除這個batch的資料,具體原始碼如下:

private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))//清除remeberDuration時間間隔之前的RDD
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist(false)
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo("Removing blocks of RDD " + b + " of time " + time)
            b.removeBlocks()//將RDD佔用的記憶體塊從BlockManager釋放
          case _ =>
        }
      }
    }
    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
    dependencies.foreach(_.clearMetadata(time))
  }

從上面的程式碼可知,當一個batch處理結束之後,會將remeberDuration時間間隔之前的RDD刪除,並且將這個RDD在BlockManager中佔用的記憶體塊釋放。所以通過設定自定義InputDStream的remeberDuration來防止一個batch產生的RDD馬上被釋放。

    Spark-streaming每個batch都會產生RDD,為了避免每個Batch都從HDFS載入檔案產生RDD,需要將HDFS中大檔案產生的RDD在前後的DStream進行傳遞,因此在自定義InputDStream中定義一個成員變數,記錄已經生成的RDD,並且這個成員變數只有在rememberDuration時間間隔才會發生改變。自定義InputDStream的compute方法定義為:

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    // Find new files
    if (rddGenerateTime == 0 || Duration(validTime.milliseconds -  rddGenerateTime) >= durationToRemember){//從HDFS讀取資料生成的RDD的週期為durationToRemember
      val newFiles = findNewFiles(validTime.milliseconds)//這個方法會載入目錄裡面的所有檔案
      logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
      val rdds = Some(filesToRDD(newFiles))
      rddGenerateTime = validTime.milliseconds//記錄從HDFS讀取資料的時間
      val metadata = Map(
        "files" -> newFiles.toList,
        StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
      val inputInfo = StreamInputInfo(id, 0, metadata)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
      rememberRDDs = rdds//記錄從HDFS讀取資料生成的RDD
      rdds
    }else {
      /*
      這樣在DStream.getOrCompute方法裡面會把這個RDD重複的放入generateRDDs這個HashMap裡面.不過沒有關係,因為generateRDDs裡面的RDD
      在超過rememberDuration之後會將RDD從generateRDDs裡面刪除,並且將RDD所佔用的記憶體塊也刪除。在實際執行的時候不會發生重複刪除記憶體塊,因為刪除記憶體塊之前會先檢查記憶體塊是否存在
      第一次刪除之後,
      以後再刪除的時候發現記憶體塊已經不存在了,會直接返回,不會重複刪除
      */
      //useRememberRDDTime = validTime.milliseconds
      rememberRDDs

    }

  }

在上面的方法中,從HDFS讀取資料生成的RDD的週期為durationToRemember,並且從HDFS讀取資料生成的RDD儲存在了自定義InputDStream的remeberRDDs成員。如果時間還沒有達到durationToRemember則將remeberRDDs中儲存的RDD作為這個batch產生的HadoopRDD,這樣達到了RDD在前後DStream的傳遞,避免了頻繁的讀取HDFS資料。另外這個自定義DStream是仿照FileInputDStream類寫的,它借鑑了FileInputDStream.compute方法對目錄檔案的監控,在一個durationToRemember週期載入一次監控目錄的所有檔案產生RDD。

    最終的實現效果如下圖所示:


在上圖中,spark-streaming DStream的產生batch是2秒,但是NewHadoopRDD 1天才生成一次,並且Cache到了記憶體。在1天的remeberDuration時間間隔內不同的MyInputDSteam使用相同的NewHadoopRDD.

為了避免進行join的時候發生shuffle,將Kafka DStream每個batch的資料broadcast,進行join。測試程式碼如下:

kafkaMessages.transformWith[(String, String), (String, String, String)](hadoopMessages, (kafkaRDD:RDD[(String, String)], fileRDD:RDD[(String, String)]) =>{
     val sqlContext = SQLContextSingleton.getInstance(kafkaRDD.sparkContext)
     import sqlContext.implicits._
     val kafkaDf = kafkaRDD.map{case (ip, str) =>KafkaObj(ip, str)}.toDF
     //kafkaDf.registerTempTable("kafkatable")
     val fileDf = fileRDD.map{case (ip, str) =>FileObj(ip, str)}.toDF
     fileDf.join(broadcast(kafkaDf), "ip").rdd.map(row =>{
       (row.get(0).toString, row.get(1).toString, row.get(2).toString)
     })
   })

自定義InputDStream的filesToRDD 方法將所有檔案轉化成RDD,原始碼如下:
 private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
    val fileRDDs = files.map { file =>
      val rdd = serializableConfOpt.map(_.value) match {
        case Some(config) => {
          context.sparkContext.newAPIHadoopFile(
            file,
            fm.runtimeClass.asInstanceOf[Class[F]],
            km.runtimeClass.asInstanceOf[Class[K]],
            vm.runtimeClass.asInstanceOf[Class[V]],
            config)
            .map { case (k, v) =>
              val newKey = new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
              val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
              (newKey, newText)//因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體

          }
        }
        case None => {
          context.sparkContext.newAPIHadoopFile[K, V, F](file)
          .map{case (k, v) =>
            val newKey =new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
            val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
            (newKey, newText)<span style="font-family: Arial, Helvetica, sans-serif;">//因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體</span>


          }
        }
      }
      if (rdd.partitions.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
      rdd
    }
    new UnionRDD(context.sparkContext, fileRDDs)
  }

因為NewHadoopRDD每個元素都佔用了同一塊記憶體,所以必須複製每個元素的k、v才能將這個RDD cache到記憶體

從上面的方法知道,filesToRDD返回的RDD並不是根RDD(根據NewHadoopRDD建立了UnionRDD),這個RDD也是自定義InputDStream的compute方法返回的RDD。

spark-streaming的DSteam.getOrCompute方法將compute返回的RDD checkpoint到HDFS,原始碼如下:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()//將DStream的compute方法產生的RDD checkpoint到HDFS
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

如果設定了checkpoint,則將DStream的compute方法產生的RDD checkpoint到HDFS。

這個UnionRDD cache到了記憶體,但是如果發生checkpoint,會把這個UnionRDD寫到磁碟,這樣也會引起大量的磁碟I/O,為了解決這個問題,這個解決方案不能設定checkpoint。如果需要checkpoint,需要將KafkaDirectDStream每個Kafka分割槽的偏移量同步到zookeeper,每次程式重啟的時候從zookeeper獲取KafkaDirectDStream上一次讀取到的位置。