1. 程式人生 > >Spark Streaming資源動態申請和動態控制消費速率原理剖析

Spark Streaming資源動態申請和動態控制消費速率原理剖析

為什麼需要動態?
a) Spark預設情況下粗粒度的,先分配好資源再計算。對於Spark Streaming而言有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會有大量的資源浪費。
b) Spark Streaming不斷的執行,對資源消耗和管理也是我們要考慮的因素。
Spark Streaming資源動態調整的時候會面臨挑戰:
Spark Streaming是按照Batch Duration執行的,Batch Duration需要很多資源,下一次Batch Duration就不需要那麼多資源了,調整資源的時候還沒調整完Batch Duration執行就已經過期了。這個時候調整時間間隔。

Spark Streaming資源動態申請
1. 在SparkContext中預設是不開啟動態資源分配的,但是可以通過手動在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && 
//引數配置是否開啟資源動態分配
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else
{ None } _executorAllocationManager.foreach(_.start())
2.  ExecutorAllocationManager: 有定時器會不斷的去掃描Executor的情況,正在執行的Stage,要執行在不同的Executor中,要麼增加Executor或者減少。
3.  ExecutorAllocationManager中schedule方法會被週期性觸發進行資源動態調整。
/**
 * This is called at a fixed interval to regulate the number of pending executor requests
 * and number of executors running.
 *
 * First, adjust our requested executors based on the add time and our current needs.
 * Then, if the remove time for an existing executor has expired, kill the executor.
 *
 * This is factored out into its own method for testing.
 */
private def schedule(): Unit = synchronized {
  val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now)

  removeTimes.retain { case (executorId, expireTime) =>
    val expired = now >= expireTime
    if (expired) {
      initializing = false
      removeExecutor(executorId)
    }
    !expired
  }
}
4.  在ExecutorAllocationManager中會線上程池中定時器會不斷的執行schedule.
/**
 * Register for scheduler callbacks to decide when to add and remove executors, and start
 * the scheduling task.
 */
def start(): Unit = {
  listenerBus.addListener(listener)

  val scheduleTask = new Runnable() {
    override def run(): Unit = {
      try {
        schedule()
      } catch {
        case ct: ControlThrowable =>
          throw ct
        case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
      }
    }
  }
// intervalMillis定時器觸發時間
  executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

動態控制消費速率:
Spark Streaming提供了一種彈性機制,流進來的速度和處理速度的關係,是否來得及處理資料。如果不能來得及的話,他會自動動態控制資料流進來的速度,spark.streaming.backpressure.enabled引數設定。