1. 程式人生 > >spark streaming被壓分析

spark streaming被壓分析

在我們使用spark-streaming處理實時資料時,通常在Dstream端的rdd操作較為耗時,此刻的實時資料還在receiver端不斷的store。由於資料的處理不及時,即Processing Time < blockInterval也就造成了資料的積壓。此時就需要一種機制來解決receiver端store資料的“速率”。在spark streaming中就是被壓(backpressure);

簡單使用

開啟被壓引數

spark.streaming.backpressure.enabled=true

此引數會開啟spark streaming內部的被壓機制(1.5以上版本),開始後spark streaming會根據當前處理批次的scheduling delays(batch排程延遲時間)和 processing times(batch處理時間)控制receiver端的接受速率,以達到和資料的處理速度一樣快。設定的接收速率受spark.streaming.receiver.maxRate

引數的影響。

設定初始處理速率

spark.streaming.backpressure.initialRate=xxx

此引數會在receiver接收第一批(first batch)資料時初始化的最大速率,此引數只會在被壓引數開啟時有效。
設定此引數可以在啟動spark streaming程式的瞬間就達到我們期望的最大值,而不是靠被壓引數慢慢調整。

設定最小處理速率

spark.streaming.backpressure.pid.minRate=x

此引數在spark streaming中預設值為100.如果我們store的資料為一個集合,那麼允許的最小速率就是100集合的資料,此時資料量可能也會很大。所以最好設定一個初始值。比如1.

設定最大處理速率

spark.streaming.receiver.maxRate=xxx

每個receiver接收資料的最大速率,每個dstream最大隻能消耗這麼多的資料。設定為0或者負數將不做限制。

此引數一般不做設定,除非你的機器上還有其它程式。

被壓原理

我們就從receiver端的store方法開始

 /**
   * Store a single item of received data to Spark's memory.
   * These single items will be aggregated together into data blocks before
   * being pushed into Spark's memory.
   */
  def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

store方法中的supervisor物件型別為ReceiverSupervisorImpl 所以直接進入ReceiverSupervisorImpl實現類中

  /** Push a single record of received data into block generator. */
  def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
  }

defaultBlockGeneratoraddData方法內容為

  /**
   * Push a single data item into the buffer.
   */
  def addData(data: Any): Unit = {
    if (state == Active) {
     //等待push
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

被壓機制的實現就在waitToPush方法中。點進去檢視


  private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)


  def waitToPush() {
    //從令牌桶中取令牌
    rateLimiter.acquire()
  }

  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

  private def getInitialRateLimit(): Long = {
    math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
  }

仔細檢視rateLimiter物件,我們會方向這個物件就是使用Guava的開源工具包RateLimiter實現的,如果想了解rateLimiter原理的,可以google搜尋,一大堆。
有人可能說rateLimitersemphore很像,其實semphore是控制併發,而rateLimiter控制速率,儘管速率和併發很像。(具體參考:https://en.wikipedia.org/wiki/Little’s_law)

getInitialRateLimit方法我們可以看出rateLimiter的初始值為spark.streaming.backpressure.initialRate,如果沒有設定預設為最大速率spark.streaming.receiver.maxRate
GuavaRateLimiter.create(getInitialRateLimit().toDouble)方法會建立一個每秒令牌數為初始設定的令牌桶。acquire方法就是從桶中取令牌。

細心的你可能發現還有個updateRate方法,此方法會更新每秒能獲得的最大令牌數。